-
Notifications
You must be signed in to change notification settings - Fork 526
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
feat(source): add metrics for throughput of each partition #3505
Conversation
Signed-off-by: tabVersion <tabvision@bupt.icu>
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
license-eye has totally checked 879 files.
Valid | Invalid | Ignored | Fixed |
---|---|---|---|
876 | 2 | 1 | 0 |
Click to see the invalid file list
- src/source/src/monitor/metrics.rs
- src/source/src/monitor/mod.rs
Signed-off-by: tabVersion <tabvision@bupt.icu>
src/stream/src/executor/source.rs
Outdated
.stream_reader( | ||
state, | ||
self.column_ids.clone(), | ||
self.metrics.registry.clone(), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@wcy-fdu I had some trouble with building SourceMetrics
by this registry. pls take a look
Is this metrics available in redpanda / kafka-exporter? I wonder if it is necessary also to monitor on our side… Rest LGTM |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Generally LGTM!
src/source/src/connector_source.rs
Outdated
self.metrics | ||
.partition_input_count | ||
.with_label_values(&[ | ||
self.context.actor_id.to_string().as_str(), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
To reduce overhead, a new field "actor_id_str" can be added to self.context and then used here. In this way, frequent type conversions can be avoided.
src/source/src/connector_source.rs
Outdated
.partition_input_count | ||
.with_label_values(&[ | ||
self.context.actor_id.to_string().as_str(), | ||
self.context.source_id.to_string().as_str(), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
and used here
src/source/src/connector_source.rs
Outdated
.with_label_values(&[ | ||
self.context.actor_id.to_string().as_str(), | ||
self.context.source_id.to_string().as_str(), | ||
id.as_str(), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
similar change can be done here
It's normal for different sources to have different metrics. Take Flink connector metrics as an example: https://nightlies.apache.org/flink/flink-docs-master/docs/ops/metrics/#connectors |
Sure kafka provides metrics like this but currently, we do not commit to kafka so it does not know the consumption process. cc @shanicky Besides, there exists a bounded channel that involves tokio scheduling issues to the throughput so I think it is not appropriate to use |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM
Signed-off-by: tabVersion <tabvision@bupt.icu>
Codecov Report
@@ Coverage Diff @@
## main #3505 +/- ##
=======================================
Coverage 74.48% 74.49%
=======================================
Files 770 771 +1
Lines 108140 108209 +69
=======================================
+ Hits 80547 80605 +58
- Misses 27593 27604 +11
Flags with carried forward coverage won't be shown. Click here to find out more.
📣 Codecov can now indicate which changes are the most critical in Pull Requests. Learn more |
Signed-off-by: tabVersion <tabvision@bupt.icu>
Signed-off-by: tabVersion <tabvision@bupt.icu>
@@ -369,6 +369,9 @@ def section_streaming(panels): | |||
panels.target( | |||
"rate(stream_source_output_rows_counts[15s])", "source_id = {{source_id}}" | |||
), | |||
panels.target( | |||
"rate(partition_input_count[5s])", "{{actor_id}}-{{source_id}}-{{partition}}" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
5s doesn't make much sense as our collect interval is by default set to 15s.
I hereby agree to the terms of the Singularity Data, Inc. Contributor License Agreement.
What's changed and what's your intention?
PLEASE DO NOT LEAVE THIS EMPTY !!!
Please explain IN DETAIL what the changes are in this PR and why they are needed:
as title, the metric records throughput before data sending to channel in connector source, which indicates the max throughput of source.
Checklist
./risedev check
(or alias,./risedev c
)Refer to a related PR or issue link (optional)
close #3504