-
Notifications
You must be signed in to change notification settings - Fork 32
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
wait-free buffer #75
wait-free buffer #75
Conversation
…the buffer size in an empty buffer
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This looks great but I'm worried that there might be diminishing returns as things get more busy. Perhaps that suspicion is unfounded in reality but if this is lockless then is there the potential to get high amounts of contention?
I recognize the fact that if you're having contention writing metrics, your problems have to do with the metrics you're writing but it's food for thought.
buffer.go
Outdated
} else { | ||
b.buffer.append(chunk) | ||
} | ||
if length != 0 { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't understand what this is doing.
If we get length
before updating buffer.data
on line 53, couldn't we be potentially truncating things off here? I would get truncating to size
but I'm sure I'm just missing something.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It prevents from exceeding the buffer size in the common case, when the buffer is full it’ll get flushed and the code loops back to write the measures.
Now that I’m re-reading I realize this code would likely cause an infinite loop since we’ll keep retrying, thanks for pointing it out (seems like this is calling for a test).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I thought the break statement above would take care of not exceeding the buffer size.
Totally missed the infinite loop after write issue though. I think I understood what you were getting at in my head and missed that.
buffer.go
Outdated
} | ||
// Flush satisfies the Flusher interface. | ||
func (b *Buffer) Flush() { | ||
b.init(b.bufferSize()) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is this meant to be here? init
looks like it zeros buffers to me.
Maybe that's what you want, but it doesn't sound like a Flush
type to me. However, I don't know how this is used as a flusher.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I see, init is a once.Do
and we don't want to flush before they're initialized.
A bit confusing to the reader still. Wonder if there's a cleaner way. It's also confusing going between Buffer
and buffer
types in this file when they share method names.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
How about renaming the method initOnce, or prepare?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think I was more confused by the two init methods more so than the name itself. I wasn't in my editor and went to the other function when reviewing in the browser.
It's just a nit mostly.
You’re right, not waiting can potentially result in longer busy-waits with goroutines just looping trying to acquire buffers. In practice this will call for increasing the pool size. I’m not too worried about it, I think the default pool size of 2 x GOMAXPROCS should prevent this kind of scenario from occurring since there should always be a roughly half of the buffers available to be grabbed. |
Actually I wasn't running the right benchmark, we do see some differences:
In most cases the number of metrics in a batch is somewhere between 1 and 20, so likely we'll get better throughput with this change. While it may be related to contention it doesn't really tell if it improved here because the block profiler gets too much noise from synchronizations within the test package that it doesn't show anything about the mutex in the buffer. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM!
Thanks for the reviews guys, I'm still getting this tested in production and it seems to be helping so far, will merge after running it for a little longer. |
Alright this has been running without issues for a few days, definitely no contention anymore on this code path. I'll go ahead and merge! |
This PR changes the approach taken to handle concurrent use of a stats buffer to get rid of all locks and offer better throughput in highly concurrent environments. What motivated this change is an observation that on an high-traffic service, 50% of the time spent in locking/unlocking was on the mutexes in the
stats.(*Buffer).HandleMeasures
method (see profile attached).The proposed solution here is to use an array of buffers instead of a single buffer, with a wait-free lock on each buffer that goroutines have to acquire before writing metrics. They don't block on the lock tho so if a buffer is busy they'll just move to try the next one until one becomes available.
The initial chunks to which metrics were serialized is gone as well, because sync.Pool has an internal mutex which happens to hit frequently when GC is configured to be more aggressive.
I'll be testing this in production before merging it but please take a look and let me know if anything should be changed.