-
Notifications
You must be signed in to change notification settings - Fork 525
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
feat(metrics): add backpressure metrics #3636
Conversation
actor_coroutine_monitor_tasks: HashMap<ActorId, JoinHandle<()>>, | ||
|
||
/// Stores all actor output buffer montioring tasks. | ||
actor_output_buffer_monitor_tasks: HashMap<ActorId, JoinHandle<()>>, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we have a single extra coroutine for all metrics collection?
Panic because dispatcher metrics future is not started for some actors. Please check. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Flink absolutely doesn't count backpressure like this PR. I think we'd better be accurate for this metrics instead of letting future developers be confused about "why this metrics doesn't reflect the real situation".
Current implementation uses a separate coroutine task to calculate the backpressure rate. The problem would be the visibility is dependent on whether the monitor task can be scheduled on time. A better way would be each actor can report the metrics by itself. |
This PR does not implement accurate back pressure mentoring. If that's necessary, then the implementation can be changed to monitor async fn send(&mut self, message: Message) -> Result<()> {
// local channel should never fail
let start_time = if self.ch.capacity() == 0 {
Some(Instant::now())
} else {
None
};
self.ch
.send(message)
.await
.map_err(|_| internal_error("failed to send"))?;
if start_time.is_some(){
self.metrics
.output_buffer_blockting_time
.with_label_values(&[&up_actor_id])
.inc_by(start_time.unwrap().elapsed().as_nanos() as u64);
};
Ok(())
} |
I benched the overhead with my M1: #[tokio::main]
async fn main() {
let buffer_size = 10000;
let (tx, rx) = tokio::sync::mpsc::channel(buffer_size);
tokio::spawn(async move {
let mut cnt = 0;
for i in 0..buffer_size{
let start_time = madsim::time::Instant::now();
tx.send(i).await.unwrap();
cnt += start_time.elapsed().as_nanos();
}
println!("{} ns", cnt as f64 / buffer_size as f64);
}).await.unwrap();
} The result is only hundreds of nano seconds. |
Codecov Report
@@ Coverage Diff @@
## main #3636 +/- ##
==========================================
- Coverage 74.37% 74.36% -0.02%
==========================================
Files 776 776
Lines 110163 110213 +50
==========================================
+ Hits 81937 81958 +21
- Misses 28226 28255 +29
Flags with carried forward coverage won't be shown. Click here to find out more.
📣 Codecov can now indicate which changes are the most critical in Pull Requests. Learn more |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM. there are 39 commits in the PR, please remember to clean the commit log when you merge the PR.
@@ -65,8 +65,10 @@ pub struct LocalStreamManagerCore { | |||
|
|||
/// Stores all actor information, taken after actor built. | |||
actors: HashMap<ActorId, stream_plan::StreamActor>, | |||
/// Store all actor execution time montioring tasks. | |||
|
|||
/// Stores all actor tokio runtime montioring tasks. | |||
actor_monitor_tasks: HashMap<ActorId, JoinHandle<()>>, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
remove this?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It's still used for another metrics. I'm thinking to move that metrics into the actor itself, instead of spawning new tasks.
Should have fixed all the comments |
I hereby agree to the terms of the Singularity Data, Inc. Contributor License Agreement.
What's changed and what's your intention?
Add backpressure metrics. Similar to Flink backpressure monitoring, we get the backpressure rate by sampled output buffer occupation rate.
Bench result in 3-CN mode:
Checklist
./risedev check
(or alias,./risedev c
)Refer to a related PR or issue link (optional)
#3284