Skip to content

Commit

Permalink
make PdClient dynamic (#11)
Browse files Browse the repository at this point in the history
* make PdClient dynamic

Make it easier to write unit test.

Signed-off-by: Evan Zhou <coocood@gmail.com>
  • Loading branch information
coocood committed May 30, 2022
1 parent ac7577b commit 50648a9
Show file tree
Hide file tree
Showing 19 changed files with 149 additions and 157 deletions.
6 changes: 5 additions & 1 deletion cmd/tikv-server/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
use std::{path::Path, process};

use clap::{crate_authors, App, Arg};
use cloud_server::{signal_handler, TiKVServer};
use server::setup::{ensure_no_unrecognized_config, validate_and_persist_config};
use tikv::config::TiKvConfig;

Expand Down Expand Up @@ -185,5 +186,8 @@ fn main() {
println!("config check successful");
process::exit(0)
}
cloud_server::run_tikv(config);
let mut tikv = TiKVServer::new(config);
tikv.run();
signal_handler::wait_for_signal();
tikv.stop();
}
10 changes: 1 addition & 9 deletions components/cloud_server/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,22 +30,14 @@ mod status_server;
mod tikv_server;
mod transport;

pub use tikv_server::run_tikv;
pub use tikv_server::*;

#[cfg(test)]
mod tests {
use tikv::config::TiKvConfig;

use crate::tikv_server::run_tikv;

#[test]
fn test_run() {
println!("run")
}

#[test]
fn test_run_tikv() {
let config = TiKvConfig::default();
run_tikv(config);
}
}
13 changes: 5 additions & 8 deletions components/cloud_server/src/node.rs
Original file line number Diff line number Diff line change
Expand Up @@ -71,29 +71,26 @@ pub fn create_raft_storage<R: FlowStatsReporter, F: KvFormat>(

/// A wrapper for the raftstore which runs Multi-Raft.
// TODO: we will rename another better name like RaftStore later.
pub struct Node<C: PdClient + 'static> {
pub struct Node {
cluster_id: u64,
store: metapb::Store,
store_cfg: Arc<VersionTrack<StoreConfig>>,
system: RaftBatchSystem,
has_started: bool,

pd_client: Arc<C>,
pd_client: Arc<dyn PdClient>,
bg_worker: Worker,
}

impl<C> Node<C>
where
C: PdClient,
{
impl Node {
/// Creates a new Node.
pub fn new(
system: RaftBatchSystem,
cfg: &ServerConfig,
store_cfg: Arc<VersionTrack<StoreConfig>>,
pd_client: Arc<C>,
pd_client: Arc<dyn PdClient>,
bg_worker: Worker,
) -> Node<C> {
) -> Node {
let mut store = metapb::Store::default();
store.set_id(INVALID_ID);
if cfg.advertise_addr.is_empty() {
Expand Down
18 changes: 7 additions & 11 deletions components/cloud_server/src/resolve.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,19 +38,17 @@ struct StoreAddr {
}

/// A runner for resolving store addresses.
struct Runner<T, RR>
struct Runner<RR>
where
T: PdClient,
RR: RaftStoreRouter,
{
pd_client: Arc<T>,
pd_client: Arc<dyn PdClient>,
store_addrs: HashMap<u64, StoreAddr>,
router: RR,
}

impl<T, RR> Runner<T, RR>
impl<RR> Runner<RR>
where
T: PdClient,
RR: RaftStoreRouter,
{
fn resolve(&mut self, store_id: u64) -> Result<String> {
Expand Down Expand Up @@ -112,9 +110,8 @@ where
}
}

impl<T, RR> Runnable for Runner<T, RR>
impl<RR> Runnable for Runner<RR>
where
T: PdClient,
RR: RaftStoreRouter,
{
type Task = Task;
Expand All @@ -138,13 +135,12 @@ impl PdStoreAddrResolver {
}

/// Creates a new `PdStoreAddrResolver`.
pub fn new_resolver<T, RR: 'static>(
pd_client: Arc<T>,
pub fn new_resolver<RR: 'static>(
pd_client: Arc<dyn PdClient>,
worker: &Worker,
router: RR,
) -> PdStoreAddrResolver
where
T: PdClient + 'static,
RR: RaftStoreRouter,
{
let runner = Runner {
Expand Down Expand Up @@ -220,7 +216,7 @@ mod tests {
store
}

fn new_runner(store: metapb::Store) -> Runner<MockPdClient, RaftStoreBlackHole> {
fn new_runner(store: metapb::Store) -> Runner<RaftStoreBlackHole> {
let client = MockPdClient {
start: Instant::now(),
store,
Expand Down
3 changes: 1 addition & 2 deletions components/cloud_server/src/signal_handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,13 +4,12 @@ pub use self::imp::wait_for_signal;

#[cfg(unix)]
mod imp {
use engine_traits::Engines;
use libc::c_int;
use nix::sys::signal::{SIGHUP, SIGINT, SIGTERM, SIGUSR1, SIGUSR2};
use signal::trap::Trap;

#[allow(dead_code)]
pub fn wait_for_signal(_engines: Option<Engines<kvengine::Engine, rfengine::RfEngine>>) {
pub fn wait_for_signal() {
let trap = Trap::trap(&[SIGTERM, SIGINT, SIGHUP, SIGUSR1, SIGUSR2]);
for sig in trap {
match sig {
Expand Down
119 changes: 66 additions & 53 deletions components/cloud_server/src/tikv_server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -80,62 +80,18 @@ use crate::{
status_server::StatusServer,
};

/// Run a TiKV server. Returns when the server is shutdown by the user, in which
/// case the server will be properly stopped.
pub fn run_tikv(config: TiKvConfig) {
// Sets the global logger ASAP.
// It is okay to use the config w/o `validate()`,
// because `initial_logger()` handles various conditions.
initial_logger(&config);

// Print version information.
let build_timestamp = option_env!("TIKV_BUILD_TIME");
tikv::log_tikv_info(build_timestamp);

// Print resource quota.
SysQuota::log_quota();
CPU_CORES_QUOTA_GAUGE.set(SysQuota::cpu_cores_quota());

// Do some prepare works before start.
pre_start();

let _m = Monitor::default();

let mut tikv = TiKVServer::new(config);
info!("created tikv server");

// Must be called after `TiKVServer::init`.
let memory_limit = tikv.config.memory_usage_limit.unwrap().0;
let high_water = (tikv.config.memory_usage_high_water * memory_limit as f64) as u64;
register_memory_usage_high_water(high_water);

tikv.check_conflict_addr();
tikv.init_fs();
tikv.init_yatp();
tikv.init_encryption();
// TODO(x) io limiter and metrics flusher
tikv.init_engines();
let server_config = tikv.init_servers::<ApiV1>();
tikv.register_services();
tikv.run_server(server_config);
tikv.run_status_server();

signal_handler::wait_for_signal(Some(tikv.raw_engines.clone().into()));
tikv.stop();
}

const RESERVED_OPEN_FDS: u64 = 1000;

const DEFAULT_METRICS_FLUSH_INTERVAL: Duration = Duration::from_millis(10_000);
const DEFAULT_ENGINE_METRICS_RESET_INTERVAL: Duration = Duration::from_millis(60_000);
const DEFAULT_STORAGE_STATS_INTERVAL: Duration = Duration::from_secs(1);

/// A complete TiKV server.
struct TiKVServer {
pub struct TiKVServer {
config: TiKvConfig,
cfg_controller: Option<ConfigController>,
security_mgr: Arc<SecurityManager>,
pd_client: Arc<RpcClient>,
pd_client: Arc<dyn PdClient>,
system: Option<RaftBatchSystem>,
router: RaftRouter,
resolver: resolve::PdStoreAddrResolver,
Expand All @@ -162,11 +118,35 @@ struct TiKVEngines {
struct Servers {
lock_mgr: LockManager,
server: Server<RaftRouter, resolve::PdStoreAddrResolver>,
node: Node<RpcClient>,
node: Node,
}

impl TiKVServer {
fn new(mut config: TiKvConfig) -> TiKVServer {
pub fn new(mut config: TiKvConfig) -> TiKVServer {
let (security_mgr, env, pd) = Self::prepare(&mut config);
Self::setup(config, security_mgr, env, pd)
}

pub fn prepare(
config: &mut TiKvConfig,
) -> (Arc<SecurityManager>, Arc<Environment>, Arc<dyn PdClient>) {
// Sets the global logger ASAP.
// It is okay to use the config w/o `validate()`,
// because `initial_logger()` handles various conditions.
initial_logger(config);

// Print version information.
let build_timestamp = option_env!("TIKV_BUILD_TIME");
tikv::log_tikv_info(build_timestamp);

// Print resource quota.
SysQuota::log_quota();
CPU_CORES_QUOTA_GAUGE.set(SysQuota::cpu_cores_quota());

// Do some prepare works before start.
pre_start();

let _m = Monitor::default();
tikv_util::thread_group::set_properties(Some(GroupProperties::default()));
// It is okay use pd config and security config before `init_config`,
// because these configs must be provided by command line, and only
Expand All @@ -182,8 +162,16 @@ impl TiKVServer {
.build(),
);
let pd_client =
Self::connect_to_pd_cluster(&mut config, env.clone(), Arc::clone(&security_mgr));
TiKVServer::connect_to_pd_cluster(config, env.clone(), Arc::clone(&security_mgr));
(security_mgr, env, pd_client)
}

pub fn setup(
mut config: TiKvConfig,
security_mgr: Arc<SecurityManager>,
env: Arc<Environment>,
pd_client: Arc<dyn PdClient>,
) -> TiKVServer {
// Initialize and check config
let cfg_controller = Self::init_config(config);
let config = cfg_controller.get_current();
Expand Down Expand Up @@ -218,7 +206,7 @@ impl TiKVServer {
config.quota.foreground_read_bandwidth,
config.quota.max_delay_duration,
));

info!("created tikv server");
TiKVServer {
config,
cfg_controller: Some(cfg_controller),
Expand All @@ -243,6 +231,23 @@ impl TiKVServer {
}
}

pub fn run(&mut self) {
let memory_limit = self.config.memory_usage_limit.unwrap().0;
let high_water = (self.config.memory_usage_high_water * memory_limit as f64) as u64;
register_memory_usage_high_water(high_water);

self.check_conflict_addr();
self.init_fs();
self.init_yatp();
self.init_encryption();
// TODO(x) io limiter and metrics flusher
self.init_engines();
let server_config = self.init_servers::<ApiV1>();
self.register_services();
self.run_server(server_config);
self.run_status_server();
}

/// Initialize and check the config
///
/// Warnings are logged and fatal errors exist.
Expand Down Expand Up @@ -691,7 +696,7 @@ impl TiKVServer {
}
}

fn stop(self) {
pub fn stop(self) {
tikv_util::thread_group::mark_shutdown();
let mut servers = self.servers.unwrap();
servers
Expand All @@ -706,10 +711,18 @@ impl TiKVServer {

self.to_stop.into_iter().for_each(|s| s.stop());
}

pub fn get_kv_engine(&self) -> kvengine::Engine {
self.raw_engines.kv.clone()
}

pub fn get_raft_engine(&self) -> rfengine::RfEngine {
self.raw_engines.raft.clone()
}
}

impl TiKVServer {
fn init_raw_engines(pd: Arc<pd_client::RpcClient>, conf: &TiKvConfig) -> Engines {
fn init_raw_engines(pd: Arc<dyn pd_client::PdClient>, conf: &TiKvConfig) -> Engines {
// Create raft engine.
let raft_db_path = Path::new(&conf.raft_store.raftdb_path);
let kv_engine_path = PathBuf::from(&conf.storage.data_dir).join(Path::new("db"));
Expand Down Expand Up @@ -796,7 +809,7 @@ fn convert_compression_type(tp: DBCompressionType) -> u8 {
}

struct PdIDAllocator {
pd: Arc<pd_client::RpcClient>,
pd: Arc<dyn pd_client::PdClient>,
}

impl kvengine::IDAllocator for PdIDAllocator {
Expand Down
11 changes: 6 additions & 5 deletions components/pd_client/src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -652,10 +652,11 @@ impl PdClient for RpcClient {
.execute()
}

fn handle_region_heartbeat_response<F>(&self, _: u64, f: F) -> PdFuture<()>
where
F: Fn(pdpb::RegionHeartbeatResponse) + Send + 'static,
{
fn handle_region_heartbeat_response(
&self,
_: u64,
f: Box<dyn Fn(pdpb::RegionHeartbeatResponse) + Send + 'static>,
) -> PdFuture<()> {
self.pd_client.handle_region_heartbeat_response(f)
}

Expand Down Expand Up @@ -822,7 +823,7 @@ impl PdClient for RpcClient {
check_resp_header(resp.get_header())
}

fn handle_reconnect<F: Fn() + Sync + Send + 'static>(&self, f: F) {
fn handle_reconnect(&self, f: Box<dyn Fn() + Sync + Send + 'static>) {
self.pd_client.on_reconnect(Box::new(f))
}

Expand Down
12 changes: 6 additions & 6 deletions components/pd_client/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -338,11 +338,11 @@ pub trait PdClient: Send + Sync {
/// Gets a stream of Region heartbeat response.
///
/// Please note that this method should only be called once.
fn handle_region_heartbeat_response<F>(&self, _store_id: u64, _f: F) -> PdFuture<()>
where
Self: Sized,
F: Fn(pdpb::RegionHeartbeatResponse) + Send + 'static,
{
fn handle_region_heartbeat_response(
&self,
_store_id: u64,
_f: Box<dyn Fn(pdpb::RegionHeartbeatResponse) + Send + 'static>,
) -> PdFuture<()> {
unimplemented!();
}

Expand Down Expand Up @@ -383,7 +383,7 @@ pub trait PdClient: Send + Sync {
/// Registers a handler to the client, which will be invoked after reconnecting to PD.
///
/// Please note that this method should only be called once.
fn handle_reconnect<F: Fn() + Sync + Send + 'static>(&self, _: F)
fn handle_reconnect(&self, _: Box<dyn Fn() + Sync + Send + 'static>)
where
Self: Sized,
{
Expand Down
Loading

0 comments on commit 50648a9

Please sign in to comment.