Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

simplify batching on flush to cortex sink #1022

Open
wants to merge 7 commits into
base: master
Choose a base branch
from

Conversation

mcowgill-stripe
Copy link

Summary

This change uses a simple gofunc + channel to implement the batching logic. The break function requires a label, I did avoid using goto.

Motivation

Simplifying the code to enable the use of channel with select without a function closure.

Test plan

Updated existing tests, there is no functional change. However, the test does require one update due to the async nature changing subtly.

Rollout/monitoring/revert plan

This change can be reverted and should not change the behavior after deploy.

@CLAassistant
Copy link

CLA assistant check
Thank you for your submission! We really appreciate it. Like many open source projects, we ask that you sign our Contributor License Agreement before we can accept your contribution.
You have signed the CLA already but the status is still pending? Let us recheck it.

@mcowgill-stripe mcowgill-stripe changed the title update to use a channel for batching on flush to cortex sink simplify batching on flush to cortex sink Dec 30, 2022
}

doIfNotDone := func(fn func() error) error {
batching:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm sure we can avoid using a label. Should a method be introduced?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Or a done bool for the for loop

Comment on lines +218 to +221
end := i + batchSize
if end > len(metrics) {
end = len(metrics)
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
end := i + batchSize
if end > len(metrics) {
end = len(metrics)
}
end := math.Min(i + batchSize, len(metrics))

Comment on lines +225 to +226
droppedMetrics += len(metrics[i:])
break batching
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Previous behavior had this drop observable with s.logger.Error(err)

batch = []samplers.InterMetric{}
err := s.writeMetrics(ctx, batch)
if err != nil {
allErrs = multierror.Append(allErrs, err)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If I'm reading this right...
Previously if we had a single failure, we stopped processing (it returned from the method). Now it will continue through the batch?

This sounds like a fix, but might cause unintended build up of failures on remote failures.

I think, if this was intended, a test should be added for it which failed before and passes now. That'll help show @arnavdugar-stripe the functional change as well for validation.

@@ -13,6 +13,7 @@ import (

"github.com/golang/protobuf/proto"
"github.com/golang/snappy"
"github.com/hashicorp/go-multierror"
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No concern from me, but if anyone has concern about increasing the dependency surface area alternatives are listed here: https://stackoverflow.com/questions/33470649/combine-multiple-error-strings

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh nice! golang/go#53435
1.20 (can't use yet)

if end > len(metrics) {
end = len(metrics)
}
batch := metrics[i:end]
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Previously, this made a new array&slice for batch.
Now it uses a slice from metrics.

I had to read https://go.dev/blog/slices-intro, but it looks like

  1. this will be more performant
  2. It's possible that changes (including append()) inside of anything that batch is passed to may write into the array metrics

I think this fixes it (alternatively we can read the code for where its passed, but then we have to hope it never gets changed)?

Suggested change
batch := metrics[i:end]
batch := metrics[i:end:end-i]

select {
case <-ctx.Done():
return errors.New("context finished before completing metrics flush")
droppedMetrics += len(metrics[i:])
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

probably not important, but since we are looking at performance improvements anyway...
I think metrics[i:] allocates a single pointer. We could avoid the allocation by...

Suggested change
droppedMetrics += len(metrics[i:])
droppedMetrics += len(metrics)-i

}

doIfNotDone := func(fn func() error) error {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👏 hooray for not needing a closure now!

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants