Skip to content

Commit

Permalink
raftstore & raftstore-v2:control grpc server according to slowness. (#…
Browse files Browse the repository at this point in the history
…15088)

close #15086

Make raftstore & raftstore-v2 able to control the grpc service, according to the slowness.

Signed-off-by: lucasliang <nkcs_lykx@hotmail.com>

Co-authored-by: tonyxuqqi <tonyxuqi@outlook.com>
  • Loading branch information
LykxSassinator and tonyxuqqi committed Jul 17, 2023
1 parent de16ac0 commit c27b430
Show file tree
Hide file tree
Showing 40 changed files with 726 additions and 55 deletions.
20 changes: 19 additions & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 3 additions & 0 deletions Cargo.toml
Expand Up @@ -128,6 +128,7 @@ serde = { version = "1.0", features = ["derive"] }
serde_derive = "1.0"
serde_ignored = "0.1"
serde_json = { version = "1.0", features = ["preserve_order"] }
service = { workspace = true }
slog = { workspace = true }
slog-global = { workspace = true }
smallvec = "1.4"
Expand Down Expand Up @@ -254,6 +255,7 @@ members = [
"components/resource_metering",
"components/security",
"components/server",
"components/service",
"components/snap_recovery",
"components/sst_importer",
"components/test_backup",
Expand Down Expand Up @@ -331,6 +333,7 @@ resource_control = { path = "components/resource_control" }
resource_metering = { path = "components/resource_metering" }
security = { path = "components/security" }
server = { path = "components/server" }
service = { path = "components/service" }
snap_recovery = { path = "components/snap_recovery" }
sst_importer = { path = "components/sst_importer", default-features = false }
test_backup = { path = "components/test_backup" }
Expand Down
7 changes: 5 additions & 2 deletions cmd/tikv-server/src/main.rs
Expand Up @@ -217,8 +217,11 @@ fn main() {
process::exit(1)
}

let (service_event_tx, service_event_rx) = tikv_util::mpsc::unbounded(); // pipe for controling service
match config.storage.engine {
EngineType::RaftKv => server::server::run_tikv(config),
EngineType::RaftKv2 => server::server2::run_tikv(config),
EngineType::RaftKv => server::server::run_tikv(config, service_event_tx, service_event_rx),
EngineType::RaftKv2 => {
server::server2::run_tikv(config, service_event_tx, service_event_rx)
}
}
}
1 change: 1 addition & 0 deletions components/raftstore-v2/Cargo.toml
Expand Up @@ -55,6 +55,7 @@ raftstore = { workspace = true }
rand = "0.8.3"
resource_control = { workspace = true }
resource_metering = { workspace = true }
service = { workspace = true }
slog = "2.3"
smallvec = "1.4"
sst_importer = { workspace = true }
Expand Down
3 changes: 3 additions & 0 deletions components/raftstore-v2/src/batch/store.rs
Expand Up @@ -40,6 +40,7 @@ use raftstore::{
},
};
use resource_metering::CollectorRegHandle;
use service::service_manager::GrpcServiceManager;
use slog::{warn, Logger};
use sst_importer::SstImporter;
use tikv_util::{
Expand Down Expand Up @@ -678,6 +679,7 @@ impl<EK: KvEngine, ER: RaftEngine> StoreSystem<EK, ER> {
pd_worker: LazyWorker<pd::Task>,
sst_importer: Arc<SstImporter>,
key_manager: Option<Arc<DataKeyManager>>,
grpc_service_mgr: GrpcServiceManager,
) -> Result<()>
where
T: Transport + 'static,
Expand Down Expand Up @@ -793,6 +795,7 @@ impl<EK: KvEngine, ER: RaftEngine> StoreSystem<EK, ER> {
auto_split_controller,
store_meta.lock().unwrap().region_read_progress.clone(),
collector_reg_handle,
grpc_service_mgr,
self.logger.clone(),
self.shutdown.clone(),
cfg.clone(),
Expand Down
6 changes: 6 additions & 0 deletions components/raftstore-v2/src/worker/pd/mod.rs
Expand Up @@ -19,6 +19,7 @@ use raftstore::store::{
WriteStats, NUM_COLLECT_STORE_INFOS_PER_HEARTBEAT,
};
use resource_metering::{Collector, CollectorRegHandle, RawRecords};
use service::service_manager::GrpcServiceManager;
use slog::{error, warn, Logger};
use tikv_util::{
config::VersionTrack,
Expand Down Expand Up @@ -217,6 +218,9 @@ where
// For slowness detection
slowness_stats: slowness::SlownessStatistics,

// For grpc server.
grpc_service_manager: GrpcServiceManager,

logger: Logger,
shutdown: Arc<AtomicBool>,
cfg: Arc<VersionTrack<Config>>,
Expand All @@ -242,6 +246,7 @@ where
auto_split_controller: AutoSplitController,
region_read_progress: RegionReadProgressRegistry,
collector_reg_handle: CollectorRegHandle,
grpc_service_manager: GrpcServiceManager,
logger: Logger,
shutdown: Arc<AtomicBool>,
cfg: Arc<VersionTrack<Config>>,
Expand Down Expand Up @@ -279,6 +284,7 @@ where
concurrency_manager,
causal_ts_provider,
slowness_stats,
grpc_service_manager,
logger,
shutdown,
cfg,
Expand Down
24 changes: 24 additions & 0 deletions components/raftstore-v2/src/worker/pd/store.rs
Expand Up @@ -231,6 +231,8 @@ where
stats.set_cpu_usages(self.store_stat.store_cpu_usages.clone().into());
stats.set_read_io_rates(self.store_stat.store_read_io_rates.clone().into());
stats.set_write_io_rates(self.store_stat.store_write_io_rates.clone().into());
// Update grpc server status
stats.set_is_grpc_paused(self.grpc_service_manager.is_paused());

let mut interval = pdpb::TimeInterval::default();
interval.set_start_timestamp(self.store_stat.last_report_ts.into_inner());
Expand Down Expand Up @@ -268,6 +270,7 @@ where

let resp = self.pd_client.store_heartbeat(stats, None, None);
let logger = self.logger.clone();
let mut grpc_service_manager = self.grpc_service_manager.clone();
let f = async move {
match resp.await {
Ok(mut resp) => {
Expand All @@ -282,6 +285,27 @@ where
"Ignored AwakenRegions in raftstore-v2 as no hibernated regions in raftstore-v2"
);
}
// Control grpc server.
else if let Some(op) = resp.control_grpc.take() {
info!(logger, "forcely control grpc server";
"is_grpc_server_paused" => grpc_service_manager.is_paused(),
"event" => ?op,
);
match op.get_ctrl_event() {
pdpb::ControlGrpcEvent::Pause => {
if let Err(e) = grpc_service_manager.pause() {
warn!(logger, "failed to send service event to PAUSE grpc server";
"err" => ?e);
}
}
pdpb::ControlGrpcEvent::Resume => {
if let Err(e) = grpc_service_manager.resume() {
warn!(logger, "failed to send service event to RESUME grpc server";
"err" => ?e);
}
}
}
}
}
Err(e) => {
error!(logger, "store heartbeat failed"; "err" => ?e);
Expand Down
2 changes: 2 additions & 0 deletions components/raftstore-v2/tests/integrations/cluster.rs
Expand Up @@ -45,6 +45,7 @@ use raftstore_v2::{
Bootstrap, SimpleWriteEncoder, StateStorage, StoreSystem,
};
use resource_metering::CollectorRegHandle;
use service::service_manager::GrpcServiceManager;
use slog::{debug, o, Logger};
use sst_importer::SstImporter;
use tempfile::TempDir;
Expand Down Expand Up @@ -353,6 +354,7 @@ impl RunningState {
pd_worker,
importer,
key_manager,
GrpcServiceManager::dummy(),
)
.unwrap();

Expand Down
1 change: 1 addition & 0 deletions components/raftstore/Cargo.toml
Expand Up @@ -77,6 +77,7 @@ resource_metering = { workspace = true }
serde = "1.0"
serde_derive = "1.0"
serde_with = "1.4"
service = { workspace = true }
slog = { workspace = true }
slog-global = { workspace = true }
smallvec = "1.4"
Expand Down
4 changes: 3 additions & 1 deletion components/raftstore/src/store/fsm/peer.rs
Expand Up @@ -2824,7 +2824,9 @@ where
}
}
// It's v2 only message and ignore does no harm.
ExtraMessageType::MsgGcPeerResponse | ExtraMessageType::MsgFlushMemtable => (),
ExtraMessageType::MsgGcPeerResponse
| ExtraMessageType::MsgFlushMemtable
| ExtraMessageType::MsgRefreshBuckets => (),
}
}

Expand Down
5 changes: 5 additions & 0 deletions components/raftstore/src/store/fsm/store.rs
Expand Up @@ -45,6 +45,7 @@ use protobuf::Message;
use raft::StateRole;
use resource_control::{channel::unbounded, ResourceGroupManager};
use resource_metering::CollectorRegHandle;
use service::service_manager::GrpcServiceManager;
use sst_importer::SstImporter;
use tikv_alloc::trace::TraceEvent;
use tikv_util::{
Expand Down Expand Up @@ -1576,6 +1577,7 @@ impl<EK: KvEngine, ER: RaftEngine> RaftBatchSystem<EK, ER> {
collector_reg_handle: CollectorRegHandle,
health_service: Option<HealthService>,
causal_ts_provider: Option<Arc<CausalTsProviderImpl>>, // used for rawkv apiv2
grpc_service_mgr: GrpcServiceManager,
) -> Result<()> {
assert!(self.workers.is_none());
// TODO: we can get cluster meta regularly too later.
Expand Down Expand Up @@ -1715,6 +1717,7 @@ impl<EK: KvEngine, ER: RaftEngine> RaftBatchSystem<EK, ER> {
health_service,
causal_ts_provider,
snap_generator_pool,
grpc_service_mgr,
)?;
Ok(())
}
Expand All @@ -1733,6 +1736,7 @@ impl<EK: KvEngine, ER: RaftEngine> RaftBatchSystem<EK, ER> {
health_service: Option<HealthService>,
causal_ts_provider: Option<Arc<CausalTsProviderImpl>>, // used for rawkv apiv2
snap_generator_pool: FuturePool,
grpc_service_mgr: GrpcServiceManager,
) -> Result<()> {
let cfg = builder.cfg.value().clone();
let store = builder.store.clone();
Expand Down Expand Up @@ -1824,6 +1828,7 @@ impl<EK: KvEngine, ER: RaftEngine> RaftBatchSystem<EK, ER> {
health_service,
coprocessor_host,
causal_ts_provider,
grpc_service_mgr,
);
assert!(workers.pd_worker.start_with_timer(pd_runner));

Expand Down

0 comments on commit c27b430

Please sign in to comment.