Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

chore(storage): monitor compacting task count per level #8681

Merged
merged 8 commits into from
Mar 31, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
34 changes: 21 additions & 13 deletions grafana/risingwave-dashboard.dashboard.py
Original file line number Diff line number Diff line change
Expand Up @@ -692,7 +692,7 @@ def section_compaction(outer_panels):
"num of SSTs written into next level during history compactions to next level",
[
panels.target(
f"sum({metric('storage_level_compact_write')}) / sum({metric('state_store_write_build_l0_bytes')})",
f"sum({metric('storage_level_compact_write')}) / sum({metric('compactor_write_build_l0_bytes')})",
"write amplification",
),
],
Expand All @@ -705,13 +705,21 @@ def section_compaction(outer_panels):
"L{{level_index}}"),
],
),
panels.timeseries_count(
"Compacting Task Count",
"num of compact_task",
[
panels.target(f"{metric('storage_level_compact_task_cnt')}",
"{{task}}"),
],
),
panels.timeseries_bytes_per_sec(
"KBs Read from Next Level",
"",
[
panels.target(
f"sum(rate({metric('storage_level_compact_read_next')}[$__rate_interval])) by (le, level_index)",
"L{{level_index}} read",
f"sum(rate({metric('storage_level_compact_read_next')}[$__rate_interval])) by (le, group, level_index)",
"cg{{group}}-L{{level_index}} read",
),
],
),
Expand All @@ -720,8 +728,8 @@ def section_compaction(outer_panels):
"",
[
panels.target(
f"sum(rate({metric('storage_level_compact_read_curr')}[$__rate_interval])) by (le, level_index)",
"L{{level_index}} read",
f"sum(rate({metric('storage_level_compact_read_curr')}[$__rate_interval])) by (le, group, level_index)",
"cg{{group}}-L{{level_index}} read",
),
],
),
Expand All @@ -730,8 +738,8 @@ def section_compaction(outer_panels):
"",
[
panels.target(
f"sum(rate({metric('storage_level_compact_read_sstn_curr')}[$__rate_interval])) by (le, level_index)",
"L{{level_index}} read",
f"sum(rate({metric('storage_level_compact_read_sstn_curr')}[$__rate_interval])) by (le, group, level_index)",
"cg{{group}}-L{{level_index}} read",
),
],
),
Expand All @@ -740,8 +748,8 @@ def section_compaction(outer_panels):
"",
[
panels.target(
f"sum(rate({metric('storage_level_compact_write')}[$__rate_interval])) by (le, level_index)",
"L{{level_index}} write",
f"sum(rate({metric('storage_level_compact_write')}[$__rate_interval])) by (le, group, level_index)",
"cg{{group}}-L{{level_index}} write",
),
],
),
Expand All @@ -750,8 +758,8 @@ def section_compaction(outer_panels):
"",
[
panels.target(
f"sum(rate({metric('storage_level_compact_write_sstn')}[$__rate_interval])) by (le, level_index)",
"L{{level_index}} write",
f"sum(rate({metric('storage_level_compact_write_sstn')}[$__rate_interval])) by (le, group, level_index)",
"cg{{group}}-L{{level_index}} write",
),
],
),
Expand All @@ -760,8 +768,8 @@ def section_compaction(outer_panels):
"num of SSTs read from next level during history compactions to next level",
[
panels.target(
f"sum(rate({metric('storage_level_compact_read_sstn_next')}[$__rate_interval])) by (le, level_index)",
"L{{level_index}} read",
f"sum(rate({metric('storage_level_compact_read_sstn_next')}[$__rate_interval])) by (le, group, level_index)",
"cg{{group}}-L{{level_index}} read",
),
],
),
Expand Down
2 changes: 1 addition & 1 deletion grafana/risingwave-dashboard.json

Large diffs are not rendered by default.

4 changes: 4 additions & 0 deletions src/meta/src/hummock/level_handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -112,6 +112,10 @@ impl LevelHandler {
.map(|task| task.task_id)
.collect_vec()
}

pub fn get_pending_tasks(&self) -> Vec<RunningCompactTask> {
self.pending_tasks.clone()
}
}

impl From<&LevelHandler> for risingwave_pb::hummock::LevelHandler {
Expand Down
32 changes: 30 additions & 2 deletions src/meta/src/hummock/metrics_utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.

use std::collections::BTreeMap;
use std::collections::{BTreeMap, HashSet};
use std::sync::atomic::Ordering;
use std::sync::Arc;
use std::time::{SystemTime, UNIX_EPOCH};
Expand Down Expand Up @@ -89,9 +89,10 @@ pub fn trigger_sst_stat(
level_sst_size / 1024
};

let mut compacting_task_stat: BTreeMap<(usize, usize), usize> = BTreeMap::default();
for idx in 0..current_version.num_levels(compaction_group_id) {
let sst_num = level_sst_cnt(idx);
let level_label = format!("{}_{}", idx, compaction_group_id);
let level_label = format!("cg{}_L{}", compaction_group_id, idx);
metrics
.level_sst_num
.with_label_values(&[&level_label])
Expand All @@ -106,9 +107,36 @@ pub fn trigger_sst_stat(
.level_compact_cnt
.with_label_values(&[&level_label])
.set(compact_cnt as i64);

let compacting_task = compact_status.level_handlers[idx].get_pending_tasks();
let mut pending_task_ids: HashSet<u64> = HashSet::default();
for task in compacting_task {
if pending_task_ids.contains(&task.task_id) {
continue;
}

if idx != 0 && idx == task.target_level as usize {
continue;
}

let key = (idx, task.target_level as usize);
let count = compacting_task_stat.entry(key).or_insert(0);
*count += 1;

pending_task_ids.insert(task.task_id);
}
}
}

tracing::debug!("LSM Compacting STAT {:?}", compacting_task_stat);
for ((select, target), compacting_task_count) in compacting_task_stat {
let label_str = format!("cg{} L{} -> L{}", compaction_group_id, select, target);
metrics
.level_compact_task_cnt
.with_label_values(&[&label_str])
.set(compacting_task_count as _);
}

let level_label = format!("cg{}_l0_sub", compaction_group_id);
let sst_num = current_version
.levels
Expand Down
9 changes: 9 additions & 0 deletions src/meta/src/rpc/metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,7 @@ pub struct MetaMetrics {
/// The number of compactor CPU need to be scale.
pub scale_compactor_core_num: IntGauge,

pub level_compact_task_cnt: IntGaugeVec,
pub object_store_metric: Arc<ObjectStoreMetrics>,
}

Expand Down Expand Up @@ -328,6 +329,13 @@ impl MetaMetrics {
)
.unwrap();

let level_compact_task_cnt = register_int_gauge_vec_with_registry!(
"storage_level_compact_task_cnt",
"num of compact_task organized by group and level",
&["task"],
registry
)
.unwrap();
let object_store_metric = Arc::new(ObjectStoreMetrics::new(registry.clone()));

Self {
Expand Down Expand Up @@ -365,6 +373,7 @@ impl MetaMetrics {
compact_pending_bytes,
compact_level_compression_ratio,
scale_compactor_core_num,
level_compact_task_cnt,
object_store_metric,
}
}
Expand Down
17 changes: 10 additions & 7 deletions src/storage/src/hummock/compactor/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -130,7 +130,7 @@ impl Compactor {
context
.compactor_metrics
.compact_read_current_level
.with_label_values(&[group_label.as_str(), cur_level_label.as_str()])
.with_label_values(&[&group_label, &cur_level_label])
.inc_by(
select_table_infos
.iter()
Expand All @@ -140,26 +140,29 @@ impl Compactor {
context
.compactor_metrics
.compact_read_sstn_current_level
.with_label_values(&[group_label.as_str(), cur_level_label.as_str()])
.with_label_values(&[&group_label, &cur_level_label])
.inc_by(select_table_infos.len() as u64);

let sec_level_read_bytes = target_table_infos.iter().map(|t| t.file_size).sum::<u64>();
let next_level_label = compact_task.target_level.to_string();
context
.compactor_metrics
.compact_read_next_level
.with_label_values(&[group_label.as_str(), next_level_label.as_str()])
.with_label_values(&[&group_label, next_level_label.as_str()])
.inc_by(sec_level_read_bytes);
context
.compactor_metrics
.compact_read_sstn_next_level
.with_label_values(&[group_label.as_str(), next_level_label.as_str()])
.with_label_values(&[&group_label, next_level_label.as_str()])
.inc_by(target_table_infos.len() as u64);

let timer = context
.compactor_metrics
.compact_task_duration
.with_label_values(&[compact_task.input_ssts[0].level_idx.to_string().as_str()])
.with_label_values(&[
&group_label,
&compact_task.input_ssts[0].level_idx.to_string(),
])
.start_timer();

let (need_quota, file_counts) = estimate_state_for_compaction(&compact_task);
Expand Down Expand Up @@ -313,12 +316,12 @@ impl Compactor {
context
.compactor_metrics
.compact_write_bytes
.with_label_values(&[group_label.as_str(), level_label.as_str()])
.with_label_values(&[&group_label, level_label.as_str()])
.inc_by(compaction_write_bytes);
context
.compactor_metrics
.compact_write_sstn
.with_label_values(&[group_label.as_str(), level_label.as_str()])
.with_label_values(&[&group_label, level_label.as_str()])
.inc_by(compact_task.sorted_output_ssts.len() as u64);

if let Err(e) = context
Expand Down
2 changes: 1 addition & 1 deletion src/storage/src/monitor/compactor_metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ impl CompactorMetrics {
exponential_buckets(0.1, 1.6, 28).unwrap() // max 52000s
);
let compact_task_duration =
register_histogram_vec_with_registry!(opts, &["level"], registry).unwrap();
register_histogram_vec_with_registry!(opts, &["group", "level"], registry).unwrap();
let opts = histogram_opts!(
"compactor_get_table_id_total_time_duration",
"Total time of compact that have been issued to state store",
Expand Down