Skip to content

Commit

Permalink
raftstore-v2: store heartbeat supports write keys and bytes. (#14271)
Browse files Browse the repository at this point in the history
ref #12842

1. store heartbeat supports write keys and bytes.

Signed-off-by: bufferflies <1045931706@qq.com>

Co-authored-by: Ti Chi Robot <ti-community-prow-bot@tidb.io>
  • Loading branch information
bufferflies and ti-chi-bot committed Mar 2, 2023
1 parent 4f2430d commit d74fd13
Show file tree
Hide file tree
Showing 7 changed files with 67 additions and 19 deletions.
13 changes: 12 additions & 1 deletion components/raftstore-v2/src/batch/store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,10 @@ use raft::{StateRole, INVALID_ID};
use raftstore::{
coprocessor::{CoprocessorHost, RegionChangeEvent},
store::{
fsm::store::{PeerTickBatch, ENTRY_CACHE_EVICT_TICK_DURATION},
fsm::{
store::{PeerTickBatch, ENTRY_CACHE_EVICT_TICK_DURATION},
GlobalStoreStat, LocalStoreStat,
},
local_metrics::RaftMetrics,
AutoSplitController, Config, ReadRunner, ReadTask, SplitCheckRunner, SplitCheckTask,
StoreWriters, TabletSnapManager, Transport, WriteSenders,
Expand Down Expand Up @@ -85,6 +88,8 @@ pub struct StoreContext<EK: KvEngine, ER: RaftEngine, T> {
pub self_disk_usage: DiskUsage,

pub snap_mgr: TabletSnapManager,
pub global_stat: GlobalStoreStat,
pub store_stat: LocalStoreStat,
pub sst_importer: Arc<SstImporter>,
}

Expand Down Expand Up @@ -162,6 +167,7 @@ impl<EK: KvEngine, ER: RaftEngine, T> StorePoller<EK, ER, T> {
fn flush_events(&mut self) {
self.schedule_ticks();
self.poll_ctx.raft_metrics.maybe_flush();
self.poll_ctx.store_stat.flush();
}

fn schedule_ticks(&mut self) {
Expand Down Expand Up @@ -279,6 +285,7 @@ struct StorePollerBuilder<EK: KvEngine, ER: RaftEngine, T> {
store_meta: Arc<Mutex<StoreMeta<EK>>>,
shutdown: Arc<AtomicBool>,
snap_mgr: TabletSnapManager,
global_stat: GlobalStoreStat,
sst_importer: Arc<SstImporter>,
}

Expand Down Expand Up @@ -308,6 +315,7 @@ impl<EK: KvEngine, ER: RaftEngine, T> StorePollerBuilder<EK, ER, T> {
.after_start(move || set_io_type(IoType::ForegroundWrite))
.name_prefix("apply")
.build_future_pool();
let global_stat = GlobalStoreStat::default();
StorePollerBuilder {
cfg,
store_id,
Expand All @@ -322,6 +330,7 @@ impl<EK: KvEngine, ER: RaftEngine, T> StorePollerBuilder<EK, ER, T> {
snap_mgr,
shutdown,
coprocessor_host,
global_stat,
sst_importer,
}
}
Expand Down Expand Up @@ -440,6 +449,8 @@ where
self_disk_usage: DiskUsage::Normal,
snap_mgr: self.snap_mgr.clone(),
coprocessor_host: self.coprocessor_host.clone(),
global_stat: self.global_stat.clone(),
store_stat: self.global_stat.local(),
sst_importer: self.sst_importer.clone(),
};
poll_ctx.update_ticks_timeout();
Expand Down
2 changes: 2 additions & 0 deletions components/raftstore-v2/src/operation/command/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -365,6 +365,8 @@ impl<EK: KvEngine, ER: RaftEngine> Peer<EK, ER> {
.add_bucket_flow(&apply_res.bucket_stat);
self.update_split_flow_control(&apply_res.metrics);
self.update_stat(&apply_res.metrics);
ctx.store_stat.engine_total_bytes_written += apply_res.metrics.written_bytes;
ctx.store_stat.engine_total_keys_written += apply_res.metrics.written_keys;

self.raft_group_mut()
.advance_apply_to(apply_res.applied_index);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,11 @@ impl SimpleWriteBinary {
pub fn freeze(&mut self) {
self.write_type = WriteType::Unspecified;
}

#[inline]
pub fn data_size(&self) -> usize {
self.buf.len()
}
}

/// We usually use `RaftCmdRequest` for read write request. But the codec is
Expand Down
16 changes: 14 additions & 2 deletions components/raftstore-v2/src/operation/pd.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@

//! This module implements the interactions with pd.

use std::sync::atomic::Ordering;

use engine_traits::{KvEngine, RaftEngine};
use fail::fail_point;
use kvproto::{metapb, pdpb};
Expand Down Expand Up @@ -47,8 +49,18 @@ impl Store {

stats.set_start_time(self.start_time().unwrap() as u32);

stats.set_bytes_written(0);
stats.set_keys_written(0);
stats.set_bytes_written(
ctx.global_stat
.stat
.engine_total_bytes_written
.swap(0, Ordering::Relaxed),
);
stats.set_keys_written(
ctx.global_stat
.stat
.engine_total_keys_written
.swap(0, Ordering::Relaxed),
);
stats.set_is_busy(false);
// TODO: add query stats
let task = pd::Task::StoreHeartbeat { stats };
Expand Down
37 changes: 27 additions & 10 deletions components/raftstore-v2/tests/integrations/test_pd_heartbeat.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ use kvproto::raft_cmdpb::{RaftCmdRequest, StatusCmdType};
use pd_client::PdClient;
use raftstore::coprocessor::Config as CopConfig;
use raftstore_v2::{
router::{PeerMsg, PeerTick},
router::{PeerMsg, PeerTick, StoreMsg, StoreTick},
SimpleWriteEncoder,
};
use tikv_util::{config::ReadableSize, store::new_peer};
Expand Down Expand Up @@ -54,18 +54,35 @@ fn test_region_heartbeat() {

#[test]
fn test_store_heartbeat() {
let region_id = 2;
let cluster = Cluster::with_node_count(1, None);
let store_id = cluster.node(0).id();
for _ in 0..5 {
let stats = block_on(cluster.node(0).pd_client().get_store_stats_async(store_id)).unwrap();
if stats.get_start_time() > 0 {
assert_ne!(stats.get_capacity(), 0);
assert_ne!(stats.get_used_size(), 0);
return;
}
std::thread::sleep(std::time::Duration::from_millis(50));
let router = &cluster.routers[0];
// load data to split bucket.
let header = Box::new(router.new_request_for(region_id).take_header());
let mut put = SimpleWriteEncoder::with_capacity(64);
put.put(CF_DEFAULT, b"key", b"value");
let data = put.encode();
let write_bytes = data.data_size();
let (msg, sub) = PeerMsg::simple_write(header, data);
router.send(region_id, msg).unwrap();
let _resp = block_on(sub.result()).unwrap();

// report store heartbeat to pd.
std::thread::sleep(std::time::Duration::from_millis(50));
router
.store_router()
.send_control(StoreMsg::Tick(StoreTick::PdStoreHeartbeat))
.unwrap();
std::thread::sleep(std::time::Duration::from_millis(50));

let stats = block_on(cluster.node(0).pd_client().get_store_stats_async(store_id)).unwrap();
if stats.get_start_time() > 0 {
assert_ne!(stats.get_capacity(), 0);
assert_ne!(stats.get_used_size(), 0);
assert_eq!(stats.get_keys_written(), 1);
assert!(stats.get_bytes_written() > write_bytes.try_into().unwrap());
}
panic!("failed to get store stats");
}

#[test]
Expand Down
1 change: 1 addition & 0 deletions components/raftstore/src/store/fsm/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ pub use self::{
ChangePeer, ExecResult, GenSnapTask, Msg as ApplyTask, Notifier as ApplyNotifier, Proposal,
Registration, SwitchWitness, TaskRes as ApplyTaskRes,
},
metrics::{GlobalStoreStat, LocalStoreStat},
peer::{new_admin_request, DestroyPeerJob, PeerFsm, MAX_PROPOSAL_SIZE_RATIO},
store::{
create_raft_batch_system, RaftBatchSystem, RaftPollerBuilder, RaftRouter, StoreInfo,
Expand Down
12 changes: 6 additions & 6 deletions components/raftstore/src/store/fsm/store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2479,22 +2479,22 @@ impl<'a, EK: KvEngine, ER: RaftEngine, T: Transport> StoreFsmDelegate<'a, EK, ER
.global_stat
.stat
.engine_total_bytes_written
.swap(0, Ordering::SeqCst),
.swap(0, Ordering::Relaxed),
);
stats.set_keys_written(
self.ctx
.global_stat
.stat
.engine_total_keys_written
.swap(0, Ordering::SeqCst),
.swap(0, Ordering::Relaxed),
);

stats.set_is_busy(
self.ctx
.global_stat
.stat
.is_busy
.swap(false, Ordering::SeqCst),
.swap(false, Ordering::Relaxed),
);

let mut query_stats = QueryStats::default();
Expand All @@ -2503,21 +2503,21 @@ impl<'a, EK: KvEngine, ER: RaftEngine, T: Transport> StoreFsmDelegate<'a, EK, ER
.global_stat
.stat
.engine_total_query_put
.swap(0, Ordering::SeqCst),
.swap(0, Ordering::Relaxed),
);
query_stats.set_delete(
self.ctx
.global_stat
.stat
.engine_total_query_delete
.swap(0, Ordering::SeqCst),
.swap(0, Ordering::Relaxed),
);
query_stats.set_delete_range(
self.ctx
.global_stat
.stat
.engine_total_query_delete_range
.swap(0, Ordering::SeqCst),
.swap(0, Ordering::Relaxed),
);
stats.set_query_stats(query_stats);

Expand Down

0 comments on commit d74fd13

Please sign in to comment.