-
Notifications
You must be signed in to change notification settings - Fork 102
Make message cache size configurable per CG #39
Conversation
This patch makes sure we can configure the message cache size per CG. Also modifies the integration test to use the new config rather than overwriting the default value.
We should move the cassandra based cfg manager behind the config manager interface rather than using the hard type.
Modify message cache test to use the config manager mock interface and update the args passed to the newMessageDeliveryCache.
// get the initial credits based on the message cache size | ||
cfg, err := cgCache.getDynamicCgConfig() | ||
if err == nil { | ||
extCache.initialCredits = cgCache.getMessageCacheSize(cfg, defaultNumOutstandingMsgs) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We need to set extCache.initialCredits to a default value if the err != nil, right?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@GuillaumeBailey it is set already above in line 312 (already existing code).
// OutputCgConfig is the per cg config used by the | ||
// cassandra config manager | ||
OutputCgConfig struct { | ||
MessageCacheSize []string `name:"messagecachesize" default:"/=10000"` |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
can you add a comment stating that each entry in the slice is a tuple or change the name of the member to reflect that.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
forgot to add this.. will add it now!
@@ -781,6 +786,8 @@ func (msgCache *cgMsgCache) manageMessageDeliveryCache() { | |||
msgCache.numAcks = 0 | |||
} | |||
} | |||
case <-msgCache.cfgRefreshTicker.C: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do you need another ticker ? Can you do this refresh whenever redeliveryTicker fires ? Since the configMgr already caches the object, calling Get() should be light-weight.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah I was mainly worried about the calls to cassandra every 500ms (which is the redeliveryTicker) but since it is caching it, i will update to use the same ticker..
@@ -781,6 +786,8 @@ func (msgCache *cgMsgCache) manageMessageDeliveryCache() { | |||
msgCache.numAcks = 0 | |||
} | |||
} | |||
case <-msgCache.cfgRefreshTicker.C: | |||
msgCache.refreshCgConfig(msgCache.maxOutstandingMsgs) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The creditBatchSize (declared above) is derived out of this maxOutstandingMsgs, so when this gets refreshed, you likely also want to adjust the credit batch size to the store.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good catch! will fix it.
(1) Use the already existing redleivery ticker for refreshing the config as well since configMgr already caches the object. (2) Make sure creditBatchSize is updated.
This patch makes sure we can configure the message cache size per
CG and has the following pieces:
(1) Move the cassandra based config manager behind the interface so that we can easily write tests
(2) Update existing code which got the hard type to get the interface
(3) Modify outputhost to get the new message cache size value from configMgr. Update tests to use this value.