Skip to content

Fix unbounded memory allocation. #275

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 6 commits into from
Jul 2, 2020

Conversation

mikecdavis
Copy link
Contributor

Summary

  • Add bounded capacity to InMemoryQueue directly
  • Fix potential concurrency issues in InMemoryQueue when checking queue size
  • Fix QueueEventDispatcher size gauge metric
  • Fix leaking goroutines in QueueEventDispatcher
  • Run go mod tidy

Issues

  • OASIS-6735
  • OASIS-6738

Copy link
Contributor

@aliabbasrizvi aliabbasrizvi left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks good.

@@ -136,7 +136,6 @@ func (ed *QueueEventDispatcher) flushEvents() {

if err == nil {
if success {
ed.logger.Debug(fmt.Sprintf("Dispatched log event %+v", event))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Was the message too chatty/not useful?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This line is wasting resources. The event payload (which is large) is getting parsed into this string representation regardless of the log level. Our logging interface should really have a Debugf(...), etc method for these types of formatted messages.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Might want to keep this line, but don't parse the whole event. It can be useful.

)

// Queue represents a queue
type Queue interface {
Add(item interface{})
Add(item interface{}) // Should return a bool
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

TODO?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah probably a TODO item, but unfortunately it would break the interface otherwise I would have done it here. Likely we'd have to define another interface and add backwards compatibility but wrapping any old implementations into the new one.


if len(q.Queue) >= q.MaxSize {
q.logger.Warning("MaxQueueSize has been met. Discarding event")
return
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Shouldn't this be returning error or something? Or is that planned in a follow up?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should yes, and this is what prompted me to add that comment on the struct to return a boolean. So for now planned follow-up.

@@ -22,14 +22,25 @@ import (
"testing"
)

func TestInMemoryQueue_Add(t *testing.T) {
Copy link
Contributor

@aliabbasrizvi aliabbasrizvi Jul 1, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit. This could probably be more descriptive i.e. TestInMemoryQueue_Add_Queue_Size_Bounded_By_MaxSize?

@@ -94,24 +95,23 @@ type QueueEventDispatcher struct {
// DispatchEvent queues event with callback and calls flush in a go routine.
func (ed *QueueEventDispatcher) DispatchEvent(event LogEvent) (bool, error) {
ed.eventQueue.Add(event)
go func() {
ed.flushEvents()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

duh, hehe

ed.queueSize.Set(float64(ed.eventQueue.Size()))
for ed.eventQueue.Size() > 0 {
queueSize := ed.eventQueue.Size()
for ; queueSize > 0; queueSize = ed.eventQueue.Size() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think I understand, but can you provide a quick comment explaining why this has to be in the loop?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since the Size() was only evaluated at the beginning and the end of the loop. We failed to capture any queue size that may have evolved while the loop was being processed. Under heavy load, the queue can grow and this loop will continue to iterate, but the size would not be captured. I discovered this while investigating runaway memory allocation. Our metrics were reporting a constant size in the single digits, but in reality it was growing into the hundreds of thousands.

@@ -159,7 +158,7 @@ func (ed *QueueEventDispatcher) flushEvents() {
ed.retryFlushCounter.Add(1)
}
}
ed.queueSize.Set(float64(ed.eventQueue.Size()))
ed.queueSize.Set(float64(queueSize))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

shouldn't it get the right size all the time when we use ed.eventQueue.Size() ? (just asking)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

eventQueue.Size() is a blocking call and I didn't feel it necessary to call it again here since it was just called from the for loop and we presumably exited with a size of zero. If the queue had changed since then it will be captured with the next call to flushEvents().

@mikecdavis mikecdavis force-pushed the mikecdavis/OASIS-6738-cap-dispatcher-queue branch from 5ef675f to e580b28 Compare July 2, 2020 15:45
Dispatcher: NewHTTPEventDispatcher(sdkKey, nil, nil),
queueSize: dispatcherMetricsRegistry.GetGauge(metrics.DispatcherQueueSize),
retryFlushCounter: dispatcherMetricsRegistry.GetCounter(metrics.DispatcherRetryFlush),
failFlushCounter: dispatcherMetricsRegistry.GetCounter(metrics.DispatcherFailedFlush),
sucessFlush: dispatcherMetricsRegistry.GetCounter(metrics.DispatcherSuccessFlush),
logger: logging.GetLogger(sdkKey, "QueueEventDispatcher"),
logger: logger,
processing: semaphore.NewWeighted(int64(1)),
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can we make it const

ed.eventFlushLock.Unlock()
}()
// Limit flushing to a single worker
if !ed.processing.TryAcquire(1) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

to use that const value here.

@@ -136,7 +136,6 @@ func (ed *QueueEventDispatcher) flushEvents() {

if err == nil {
if success {
ed.logger.Debug(fmt.Sprintf("Dispatched log event %+v", event))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Might want to keep this line, but don't parse the whole event. It can be useful.

@mikecdavis mikecdavis merged commit 2e7c275 into master Jul 2, 2020
@mikecdavis mikecdavis deleted the mikecdavis/OASIS-6738-cap-dispatcher-queue branch July 2, 2020 23:28
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

5 participants