-
Notifications
You must be signed in to change notification settings - Fork 2
Conversation
retryAtNanos *atomic.Int64 | ||
retried *atomic.Int64 | ||
isAcked *atomic.Bool | ||
retryAtNanos int64 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is this only accessed from one goroutine?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yep
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Cool, do you still need isAcked
to be atomic then? Or is it that some of these are accessed only from one goroutine, and others could be accessed concurrently? If the latter, would recommend adding short comments next to these field to explain their access patterns to save future readers any confusion around this.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah modified the comment
@@ -132,6 +131,8 @@ func newConsumerServiceWriter( | |||
return nil, errUnknownConsumptionType | |||
} | |||
router := newAckRouter(int(numShards)) | |||
mPool := newMessagePool(opts.MessagePoolOptions()) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
FYI in some cases adding a pool does more harm than good to performance due to the overhead of lock contention trumping the benefit of reduced GC. Not necessarily true for your case, but I've observed that for the samples on the agg tier (you need one sample per timer value), so something that's worth keeping in mind.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah, I do notice the lock contention in the pool here is heavy as well, but it still saves tons of allocation and gc in my case, I feel one pool per consumer service is kinda ok for now, in worst case I could try one pool per message writer, but I'm not gonna go that far right now...
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Have you tried disabling the pool completely and observe the CPU impact? Again it may very well be the case that pooling still does you more good than harm, but I'm just curious.
producer/writer/message.go
Outdated
@@ -24,7 +24,6 @@ import ( | |||
"github.com/m3db/m3msg/generated/proto/msgpb" | |||
"github.com/m3db/m3msg/producer" | |||
"github.com/m3db/m3msg/protocol/proto" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
New line
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
sg
producer/writer/message_writer.go
Outdated
m.Close() | ||
w.mPool.Put(m) | ||
if w.mPool != nil { | ||
m.Close() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
So if pool is nil, you don't need to close the message?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't, that close Close() is only needed when the message can be reused
producer/writer/message_writer.go
Outdated
@@ -335,8 +342,10 @@ func (w *messageWriterImpl) retryBatchWithLock( | |||
// do not stay in memory forever. | |||
w.Ack(m.Metadata()) | |||
w.queue.Remove(e) | |||
m.Close() | |||
w.mPool.Put(m) | |||
if w.mPool != nil { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'd actually pull this out into a member function e.g., freeMessage
of the writer so you can reuse.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
same goes for newMessage
above.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
sg
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM
Instead have all message writers across all consumer services to share the same global message pool, this pr creates a message pool per consumer service, which reduces lock contention on the pool.
remove unnecessary atomic usage in message.go
move the following expensive operations away from Write() to the background retry thread.