Skip to content

Commit

Permalink
feat: add barrier align metrics for merge executor (#16898)
Browse files Browse the repository at this point in the history
  • Loading branch information
lmatz committed May 23, 2024
1 parent 67d9934 commit 7716391
Show file tree
Hide file tree
Showing 5 changed files with 84 additions and 8 deletions.
2 changes: 1 addition & 1 deletion docker/dashboards/risingwave-dev-dashboard.json

Large diffs are not rendered by default.

18 changes: 18 additions & 0 deletions grafana/risingwave-dev-dashboard.dashboard.py
Original file line number Diff line number Diff line change
Expand Up @@ -1221,6 +1221,24 @@ def section_streaming_actors(outer_panels):
),
],
),
panels.timeseries_actor_latency(
"Merger Barrier Align",
"",
[
*quantile(
lambda quantile, legend: panels.target(
f"histogram_quantile({quantile}, sum(rate({metric('stream_merge_barrier_align_duration_bucket')}[$__rate_interval])) by (le, fragment_id, {COMPONENT_LABEL}))",
f"p{legend} - fragment {{{{fragment_id}}}} - {{{{{COMPONENT_LABEL}}}}}",
),
[90, 99, 999, "max"],
),
panels.target(
f"sum by(le, fragment_id, job)(rate({metric('stream_merge_barrier_align_duration_sum')}[$__rate_interval])) / sum by(le,fragment_id,{COMPONENT_LABEL}) (rate({metric('stream_merge_barrier_align_duration_count')}[$__rate_interval])) > 0",
"avg - fragment {{fragment_id}} - {{%s}}"
% COMPONENT_LABEL,
),
],
),
panels.timeseries_percentage(
"Join Actor Input Blocking Time Ratio",
"",
Expand Down
2 changes: 1 addition & 1 deletion grafana/risingwave-dev-dashboard.json

Large diffs are not rendered by default.

43 changes: 38 additions & 5 deletions src/stream/src/executor/merge.rs
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,12 @@ impl MergeExecutor {
#[try_stream(ok = Message, error = StreamExecutorError)]
async fn execute_inner(mut self: Box<Self>) {
// Futures of all active upstreams.
let select_all = SelectReceivers::new(self.actor_context.id, self.upstreams);
let select_all = SelectReceivers::new(
self.actor_context.id,
self.actor_context.fragment_id,
self.upstreams,
self.metrics.clone(),
);
let actor_id = self.actor_context.id;

let mut metrics = ActorInputMetrics::new(
Expand Down Expand Up @@ -185,8 +190,12 @@ impl MergeExecutor {

// Poll the first barrier from the new upstreams. It must be the same as
// the one we polled from original upstreams.
let mut select_new =
SelectReceivers::new(self.actor_context.id, new_upstreams);
let mut select_new = SelectReceivers::new(
self.actor_context.id,
self.fragment_id,
new_upstreams,
self.metrics.clone(),
);
let new_barrier = expect_first_barrier(&mut select_new).await?;
assert_eq!(barrier, &new_barrier);

Expand Down Expand Up @@ -251,8 +260,12 @@ pub struct SelectReceivers {

/// The actor id of this fragment.
actor_id: u32,
/// The fragment id
fragment_id: u32,
/// watermark column index -> `BufferedWatermarks`
buffered_watermarks: BTreeMap<usize, BufferedWatermarks<ActorId>>,
/// Streaming Metrics
metrics: Arc<StreamingMetrics>,
}

impl Stream for SelectReceivers {
Expand All @@ -265,6 +278,11 @@ impl Stream for SelectReceivers {
return Poll::Ready(None);
}

let merge_barrier_align_duration = self
.metrics
.merge_barrier_align_duration
.with_label_values(&[&self.actor_id.to_string(), &self.fragment_id.to_string()]);
let mut start = None;
loop {
match futures::ready!(self.active.poll_next_unpin(cx)) {
// Directly forward the error.
Expand All @@ -289,6 +307,9 @@ impl Stream for SelectReceivers {
}
Message::Barrier(barrier) => {
// Block this upstream by pushing it to `blocked`.
if self.blocked.is_empty() {
start = Some(Instant::now());
}
self.blocked.push(remaining);
if let Some(current_barrier) = self.barrier.as_ref() {
if current_barrier.epoch != barrier.epoch {
Expand All @@ -314,7 +335,12 @@ impl Stream for SelectReceivers {
// So this branch will never be reached in all cases.
Some((None, _)) => unreachable!(),
// There's no active upstreams. Process the barrier and resume the blocked ones.
None => break,
None => {
if let Some(start) = start {
merge_barrier_align_duration.observe(start.elapsed().as_secs_f64())
}
break;
}
}
}

Expand All @@ -336,16 +362,23 @@ impl Stream for SelectReceivers {
}

impl SelectReceivers {
fn new(actor_id: u32, upstreams: Vec<BoxedInput>) -> Self {
fn new(
actor_id: u32,
fragment_id: u32,
upstreams: Vec<BoxedInput>,
metrics: Arc<StreamingMetrics>,
) -> Self {
assert!(!upstreams.is_empty());
let upstream_actor_ids = upstreams.iter().map(|input| input.actor_id()).collect();
let mut this = Self {
blocked: Vec::with_capacity(upstreams.len()),
active: Default::default(),
actor_id,
fragment_id,
barrier: None,
upstream_actor_ids,
buffered_watermarks: Default::default(),
metrics,
};
this.extend_active(upstreams);
this
Expand Down
27 changes: 26 additions & 1 deletion src/stream/src/executor/monitor/streaming_stats.rs
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,10 @@ pub struct StreamingMetrics {
// Exchange (see also `compute::ExchangeServiceMetrics`)
pub exchange_frag_recv_size: GenericCounterVec<AtomicU64>,

// Streaming Merge (We break out this metric from `barrier_align_duration` because
// the alignment happens on different levels)
pub merge_barrier_align_duration: RelabeledGuardedHistogramVec<2>,

// Backpressure
pub actor_output_buffer_blocking_duration_ns: LabelGuardedIntCounterVec<3>,
pub actor_input_buffer_blocking_duration_ns: LabelGuardedIntCounterVec<3>,
Expand All @@ -88,7 +92,7 @@ pub struct StreamingMetrics {
pub join_cached_entry_count: LabelGuardedIntGaugeVec<3>,
pub join_matched_join_keys: RelabeledGuardedHistogramVec<3>,

// Streaming Join and Streaming Dynamic Filter
// Streaming Join, Streaming Dynamic Filter and Streaming Union
pub barrier_align_duration: RelabeledGuardedHistogramVec<4>,

// Streaming Aggregation
Expand Down Expand Up @@ -400,6 +404,26 @@ impl StreamingMetrics {
)
.unwrap();

let opts = histogram_opts!(
"stream_merge_barrier_align_duration",
"Duration of merge align barrier",
exponential_buckets(0.0001, 2.0, 21).unwrap() // max 104s
);
let merge_barrier_align_duration = register_guarded_histogram_vec_with_registry!(
opts,
&["actor_id", "fragment_id"],
registry
)
.unwrap();

let merge_barrier_align_duration =
RelabeledGuardedHistogramVec::with_metric_level_relabel_n(
MetricLevel::Debug,
merge_barrier_align_duration,
level,
1,
);

let join_lookup_miss_count = register_guarded_int_counter_vec_with_registry!(
"stream_join_lookup_miss_count",
"Join executor lookup miss duration",
Expand Down Expand Up @@ -1123,6 +1147,7 @@ impl StreamingMetrics {
mview_input_row_count,
sink_chunk_buffer_size,
exchange_frag_recv_size,
merge_barrier_align_duration,
actor_output_buffer_blocking_duration_ns,
actor_input_buffer_blocking_duration_ns,
join_lookup_miss_count,
Expand Down

0 comments on commit 7716391

Please sign in to comment.