Skip to content
This repository has been archived by the owner on Feb 18, 2021. It is now read-only.

Commit

Permalink
Revert "Slowdown redeliveries, add timeout metrics (#290)" (#316)
Browse files Browse the repository at this point in the history
This reverts commit 464433e.
  • Loading branch information
Kiran RG committed Oct 18, 2017
1 parent fc00e42 commit 3ef749a
Show file tree
Hide file tree
Showing 5 changed files with 8 additions and 48 deletions.
1 change: 0 additions & 1 deletion cmd/tools/common/lib.go
Original file line number Diff line number Diff line change
Expand Up @@ -733,7 +733,6 @@ func SetAdminCommands(commands *[]cli.Command) {
"cherami-storehost", "adminStatus", if set to anything other than "enabled", will prevent placement of new extent on this store
"cherami-storehost", "minFreeDiskSpaceBytes", integer, minimum required free disk space in bytes to place a new extent
"cherami-outputhost", "messagecachesize", comma-separated list of "destination/CG_name=value" for message cache size
"cherami-outputhost", "redeliveryIntervalInMs", the message redelivery interval in milliseconds
`,
Subcommands: []cli.Command{
{
Expand Down
3 changes: 0 additions & 3 deletions common/metrics/defs.go
Original file line number Diff line number Diff line change
Expand Up @@ -838,8 +838,6 @@ const (
OutputhostCGMessageSentAck
// OutputhostCGMessageSentNAck records the count of nack messages
OutputhostCGMessageSentNAck
// OutputhostCGMessageTimeout records the count of timeout messages
OutputhostCGMessageTimeout
// OutputhostCGMessagesThrottled records the count of messages throttled
OutputhostCGMessagesThrottled
// OutputhostCGAckMgrSeqNotFound is the gauge to track acks whose seq number is not found
Expand Down Expand Up @@ -1358,7 +1356,6 @@ var dynamicMetricDefs = map[ServiceIdx]map[int]metricDefinition{
OutputhostCGMessageRedelivered: {Counter, "outputhost.message.redelivered.cg"},
OutputhostCGMessageSentAck: {Counter, "outputhost.message.sent-ack.cg"},
OutputhostCGMessageSentNAck: {Counter, "outputhost.message.sent-nack.cg"},
OutputhostCGMessageTimeout: {Counter, "outputhost.message.timeout.cg"},
OutputhostCGMessagesThrottled: {Counter, "outputhost.message.throttled"},
OutputhostCGAckMgrSeqNotFound: {Counter, "outputhost.ackmgr.seq-not-found.cg"},
OutputhostCGReplicaReconnect: {Counter, "outputhost.replica-reconnect.cg"},
Expand Down
11 changes: 0 additions & 11 deletions services/outputhost/cgcache.go
Original file line number Diff line number Diff line change
Expand Up @@ -669,17 +669,6 @@ func (cgCache *consumerGroupCache) getMessageCacheSize(cfg OutputCgConfig, oldSi
return cacheSize
}

//
func (cgCache *consumerGroupCache) getRedeliveryInterval(cfg OutputCgConfig, oldInterval int32) (interval int32) {
logFn := func() bark.Logger {
return cgCache.logger
}
ruleKey := cgCache.destPath + `/` + cgCache.cachedCGDesc.GetConsumerGroupName()
interval = int32(common.OverrideValueByPrefix(logFn, ruleKey, cfg.RedeliveryIntervalInMs, int64(oldInterval), `redeliveryIntervalInMs`))

return interval
}

// loadConsumerGroupCache loads everything on this cache including the extents and within the cache
func (cgCache *consumerGroupCache) loadConsumerGroupCache(ctx thrift.Context, exists bool) error {
cgCache.extMutex.Lock()
Expand Down
4 changes: 0 additions & 4 deletions services/outputhost/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,10 +38,6 @@ type (
// with different size config as follows:
// "/test/destination//test/cg_1=50,/test/destination//test/cg_2=100"
MessageCacheSize []string `name:"messagecachesize" default:"/=10000"`

// RedeliveryIntervalInMs is used to configure the per CG message redelivery interval.
// Just like MessageCacheSize above
RedeliveryIntervalInMs []string `name:"redeliveryIntervalInMs" default:"/=100"`
}
)

Expand Down
37 changes: 8 additions & 29 deletions services/outputhost/messagecache.go
Original file line number Diff line number Diff line change
Expand Up @@ -164,7 +164,7 @@ type AckID string
const defaultLockTimeoutInSeconds int32 = 42
const defaultMaxDeliveryCount int32 = 2
const blockCheckingTimeout time.Duration = time.Minute
const defaultRedeliveryIntervalInMs = 100
const redeliveryInterval = time.Second / 2

// SmartRetryDisableString can be added to a destination or CG owner email to request smart retry to be disabled
// Note that Google allows something like this: gbailey+smartRetryDisable@uber.com
Expand Down Expand Up @@ -261,12 +261,11 @@ type cgMsgCache struct {
notifier Notifier // this notifier is used to slow down cons connections based on NACKs
consumerHealth
messageCacheHealth
creditNotifyCh chan<- int32 // this is the notify ch to notify credits to extents
creditRequestCh <-chan string // read-only channel used by the extents to request credits specifically for that extent.
maxOutstandingMsgs int32 // max allowed outstanding messages
numAcks int32 // num acks we received
cgCache *consumerGroupCache // just a reference to the cgCache to grant credits to a local extent directly
redeliveryIntervalInMs int32 // redelivery ticker interval
creditNotifyCh chan<- int32 // this is the notify ch to notify credits to extents
creditRequestCh <-chan string // read-only channel used by the extents to request credits specifically for that extent.
maxOutstandingMsgs int32 // max allowed outstanding messages
numAcks int32 // num acks we received
cgCache *consumerGroupCache // just a reference to the cgCache to grant credits to a local extent directly
shared.ConsumerGroupDescription
}

Expand Down Expand Up @@ -384,7 +383,6 @@ func (msgCache *cgMsgCache) utilHandleRedeliveryTicker() {
msgCache.startTimer(eventTimer)

var redeliveries int64
var timeouts int64

now := common.Now()

Expand All @@ -403,7 +401,7 @@ func (msgCache *cgMsgCache) utilHandleRedeliveryTicker() {
msgCache.reinjectlastAckMsg() // injects the last acked message if it is not already injected
}

for i, cache := range timerCaches {
for _, cache := range timerCaches {

nExpired := 0

Expand Down Expand Up @@ -441,10 +439,6 @@ func (msgCache *cgMsgCache) utilHandleRedeliveryTicker() {
redeliveries++
msgCache.consumerHealth.lastRedeliveryTime = now // don't use variable 'now', since processing may be very slow in degenerate cases; consider moving to utilHandleRedeliveredMsg

if i == 0 { // means the message is from the redeliveryTimerCache, where the timeout messages are
timeouts++
}

if !stalled && cm.dlqInhibit > 0 {
cm.dlqInhibit-- // Use up one of the 'extra lives', but only if the consumer group seems healthy
}
Expand Down Expand Up @@ -476,11 +470,6 @@ func (msgCache *cgMsgCache) utilHandleRedeliveryTicker() {

} // for timercaches

// Report timeout metrics
if timeouts > 0 {
msgCache.consumerM3Client.AddCounter(metrics.ConsConnectionScope, metrics.OutputhostCGMessageTimeout, timeouts)
}

// Report redelivery metrics; consider moving to utilHandleRedeliveredMsg
if redeliveries > 0 {
msgCache.m3Client.AddCounter(metrics.ConsConnectionStreamScope, metrics.OutputhostMessageRedelivered, redeliveries)
Expand Down Expand Up @@ -990,21 +979,12 @@ func (msgCache *cgMsgCache) updateConn(connID int, event msgEvent) {

func (msgCache *cgMsgCache) refreshCgConfig(oldOutstandingMessages int32) {
outstandingMsgs := oldOutstandingMessages
redeliveryIntervalInMs := msgCache.redeliveryIntervalInMs

cfg, err := msgCache.cgCache.getDynamicCgConfig()
if err == nil {
outstandingMsgs = msgCache.cgCache.getMessageCacheSize(cfg, oldOutstandingMessages)
redeliveryIntervalInMs = msgCache.cgCache.getRedeliveryInterval(cfg, msgCache.redeliveryIntervalInMs)
}

msgCache.maxOutstandingMsgs = outstandingMsgs

if redeliveryIntervalInMs != msgCache.redeliveryIntervalInMs {
msgCache.redeliveryTicker.Stop()
msgCache.redeliveryTicker = time.NewTicker(time.Duration(redeliveryIntervalInMs) * time.Millisecond)
msgCache.redeliveryIntervalInMs = redeliveryIntervalInMs
}
}

// TODO: Make the delivery cache shared among all consumer groups
Expand Down Expand Up @@ -1032,15 +1012,14 @@ func newMessageDeliveryCache(
ackMsgCh: cgCache.ackMsgCh,
nackMsgCh: cgCache.nackMsgCh,
closeChannel: make(chan struct{}),
redeliveryTicker: time.NewTicker(defaultRedeliveryIntervalInMs * time.Millisecond),
redeliveryTicker: time.NewTicker(redeliveryInterval),
ConsumerGroupDescription: cgCache.cachedCGDesc,
consumerM3Client: cgCache.consumerM3Client,
m3Client: cgCache.m3Client,
notifier: cgCache.notifier,
creditNotifyCh: cgCache.creditNotifyCh,
creditRequestCh: cgCache.creditRequestCh,
cgCache: cgCache, // just a reference to the cgCache
redeliveryIntervalInMs: defaultRedeliveryIntervalInMs,
consumerHealth: consumerHealth{
badConns: make(map[int]int),
lastAckTime: common.Now(), // Need to start in the non-stalled state
Expand Down

0 comments on commit 3ef749a

Please sign in to comment.