-
Notifications
You must be signed in to change notification settings - Fork 2.1k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[processor/interval] Implement the main logic #32054
[processor/interval] Implement the main logic #32054
Conversation
This PR was marked stale due to lack of activity. It will be closed in 14 days. |
dd599e7
to
a902abc
Compare
a902abc
to
11c4a6d
Compare
@sh0rez @jpkrohling This PR is ready for review now. Can you help me understand why the tests are failing? It says that the go.sum is missing a reference to But in the
Which matches all the other processors. Locally it compiles fine. So I'm a bit stumped... |
I just discovered |
@sh0rez @djaglowski @jpkrohling I have updated the tests, and this is ready for review again. (Though I'm still stumped why the CI tests are failing... ) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It appears to me this processor can be simplified quite a lot (and be more efficient) by having aggregate create what export needs. Some pseudo-code of the idea:
- keep the last datapoint per stream in memory. replace with other datapoint if has more recent
.Timestamp()
. - store them in a pmetric.Metrics. Populate resource, scope and metric as needed from incoming sample. have some maps for lookups (they store only the pointer types, no actual data)
- export is now trivial: return the already built pmetric.Metrics, clear the state
type State struct {
raw pmetric.Metrics
rms map[identity.Resource]pmetric.ResourceMetrics
sms map[identity.Scope]pmetric.ScopeMetrics
ms map[identity.Metric]pmetric.Metric
nums map[identity.Stream]pmetric.NumberDataPoint
}
func (s State) Update(res Resource, scope Scope, m Metric) {
lock()
defer unlock()
for _, dp := range md.DataPoints {
id := identity.OfStream(dp)
aggr := s.Number(id)
if aggr.Ts < dp.Ts {
dp.CopyTo(aggr)
}
}
}
func (s State) Number(id identity.Stream, res Resource, scope Scope, m Metric) pmetric.NumberDataPoint {
// returns or creates the pmetric.NumberDataPoint in s.nums for given stream.
// populates the required rms, sms and ms in raw as needed
}
func (s State) Export() pmetric.Metrics {
lock()
defer unlock()
out := s.raw
s.raw = pmetric.NewMetrics()
clear(s.rms); clear(s.sms); clear(s.ms); clear(s.nums)
return out
}
I think this could minimize memory usage and requires a lot less data conversion.
Does this make sense, or did I miss something important?
I like your idea! It would also solve the issue that |
ddcee21
to
bd53d3f
Compare
@sh0rez Updated. Ready for re-review as you're able :) |
6de69ee
to
920a778
Compare
No need to copy as much data back and forth
c547623
to
03a3f79
Compare
@RichieSams any specific reason to force-push this PR? makes reviewing really hard, as recent changes can't be easily identified |
"go.opentelemetry.io/collector/pdata/pmetric" | ||
) | ||
|
||
type DataPointSlice[DP DataPoint[DP]] interface { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
these types are only locally used once in aggregateDataPoints
, doesn't need an extra package, right?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, that package has gotten much much smaller since the original design. That said, I'm working on a PR for another component, and needed the same interfaces. So I'm potentially going to put them in internal/exp/metrics/streams
. See:
RichieSams@d16a215#diff-45d5f87d8148e341ebb68b4125ebe4296ccfbb1a9b63a45009358e7a0b617d26
Thoughts?
} | ||
|
||
func (p *Processor) exportMetrics() { | ||
md := func() pmetric.Metrics { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why the closure?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
So the lock only lasts for the fetch, and not for the duration of ConsumeMetrics. Since defer is on function exit, not scope exit.
Heh, a combination of habits. At work we're not allowed to have merge commits. So I generally always rebase and squash commits locally and force push. Github has little buttons to show the diff, but looking at it, it's not ideal, since it includes all the changes from main as well. This project always squashes, so I'll do a merge commit next time to catch up to HEAD. Then you can see my changes as individual commits. |
@djaglowski @evan-bradley @TylerHelmuth Any further questions or concerns? |
Can someone re-kick the coverage task. It's failing due to rate-limiting:
|
Is this ok to merge if I update to main once more? |
Description: This PR implements the main logic of this new processor. Link to tracking Issue: open-telemetry#29461 This is a re-opening of [30827](open-telemetry#30827) Testing: I added a test for the main aggregation / export behavior, but I need to add more to test state expiry Documentation: I updated the README with the updated configs, but I should add some examples as well.
Description:
This PR implements the main logic of this new processor.
Link to tracking Issue:
#29461
This is a re-opening of 30827
Testing:
I added a test for the main aggregation / export behavior, but I need to add more to test state expiry
Documentation:
I updated the README with the updated configs, but I should add some examples as well.