Skip to content
This repository has been archived by the owner on Feb 18, 2021. It is now read-only.

Commit

Permalink
BugFix: Prevent msgCache from generating duplicate redelivery messages
Browse files Browse the repository at this point in the history
  • Loading branch information
venkat1109 committed Jan 20, 2017
1 parent f7a4104 commit 41cb432
Show file tree
Hide file tree
Showing 2 changed files with 179 additions and 8 deletions.
24 changes: 16 additions & 8 deletions services/outputhost/messagecache.go
Expand Up @@ -27,11 +27,11 @@ import (

"github.com/uber-common/bark"

"github.com/uber/cherami-thrift/.generated/go/cherami"
"github.com/uber/cherami-thrift/.generated/go/shared"
"github.com/uber/cherami-server/common"
"github.com/uber/cherami-server/common/metrics"
"github.com/uber/cherami-server/services/outputhost/load"
"github.com/uber/cherami-thrift/.generated/go/cherami"
"github.com/uber/cherami-thrift/.generated/go/shared"
)

// +----------------------------------+
Expand Down Expand Up @@ -399,18 +399,20 @@ func (msgCache *cgMsgCache) utilHandleRedeliveryTicker(badConns map[int]int) {
}

for _, cache := range timerCaches {

nExpired := 0

thisCache:
for i, entry := range *cache {
for _, entry := range *cache {

// When we reach the first entry that shouldn't be fired,
// remove all previous entries and break
// break and remove all prior entries
if entry.fireTime > now {
if i > 0 {
*cache = (*cache)[i:]
}
break thisCache
}

nExpired++

ackID := entry.AckID
cm := msgCache.getState(ackID)

Expand All @@ -421,7 +423,6 @@ func (msgCache *cgMsgCache) utilHandleRedeliveryTicker(badConns map[int]int) {

switch cm.currentState {
case stateDelivered:

// Check if we need to put the message to DLQ or if we need to redeliver.
// We put the msg to DLQ on these conditions
// 1. We have already redelivered upto the max delivery count
Expand Down Expand Up @@ -457,6 +458,13 @@ func (msgCache *cgMsgCache) utilHandleRedeliveryTicker(badConns map[int]int) {
panic("Unhandled msgCache state: " + getStateString(cm.currentState) + " <- " + getStateString(cm.previousState) + ` (` + string(ackID) + `)`)
} // switch cm.currentState
} // for thisCache

if nExpired > 0 {
// remove all the entries that expired
// above by simply advancing the slice
*cache = (*cache)[nExpired:]
}

} // for timercaches

// Report redelivery metrics; consider moving to utilHandleRedeliveredMsg
Expand Down
163 changes: 163 additions & 0 deletions services/outputhost/messagecache_test.go
@@ -0,0 +1,163 @@
// Copyright (c) 2016 Uber Technologies, Inc.
//
// Permission is hereby granted, free of charge, to any person obtaining a copy
// of this software and associated documentation files (the "Software"), to deal
// in the Software without restriction, including without limitation the rights
// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
// copies of the Software, and to permit persons to whom the Software is
// furnished to do so, subject to the following conditions:
//
// The above copyright notice and this permission notice shall be included in
// all copies or substantial portions of the Software.
//
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
// THE SOFTWARE.

package outputhost

import (
"strconv"
"testing"
"time"

log "github.com/Sirupsen/logrus"
"github.com/pborman/uuid"
"github.com/stretchr/testify/require"
"github.com/stretchr/testify/suite"
"github.com/uber-common/bark"
"github.com/uber/cherami-server/common"
"github.com/uber/cherami-server/common/metrics"
"github.com/uber/cherami-server/services/outputhost/load"
"github.com/uber/cherami-thrift/.generated/go/cherami"
"github.com/uber/cherami-thrift/.generated/go/shared"
)

type MessageCacheSuite struct {
*require.Assertions // override suite.Suite.Assertions with require.Assertions; this means that s.NotNil(nil) will stop the test, not merely log an error
suite.Suite
msgCache *cgMsgCache
msgRedeliveryCh chan *cherami.ConsumerMessage
}

func TestMessageCacheTestSuite(t *testing.T) {
suite.Run(t, new(MessageCacheSuite))
}

func (s *MessageCacheSuite) SetupTest() {

s.Assertions = require.New(s.T()) // Have to define our overridden assertions in the test setup. If we did it earlier, s.T() will return nil

cgStatus := shared.ConsumerGroupStatus_ENABLED

cgDesc := &shared.ConsumerGroupDescription{
ConsumerGroupUUID: common.StringPtr(uuid.New()),
DestinationUUID: common.StringPtr(uuid.New()),
ConsumerGroupName: common.StringPtr("/unittest/msgCache_cg"),
Status: &cgStatus,
MaxDeliveryCount: common.Int32Ptr(10),
LockTimeoutSeconds: common.Int32Ptr(1),
OwnerEmail: common.StringPtr("foo+smartRetryDisable@uber.com"),
}

cgCache := &consumerGroupCache{
cachedCGDesc: *cgDesc,
cgMetrics: load.NewCGMetrics(),
}

s.msgRedeliveryCh = make(chan *cherami.ConsumerMessage, 128)

s.msgCache = newMessageDeliveryCache(
s.msgRedeliveryCh,
make(chan *cherami.ConsumerMessage, 32),
make(chan cacheMsg, 32),
make(chan cacheMsg, 32),
make(chan timestampedAckID, 32),
make(chan timestampedAckID, 32),
*cgDesc,
nil,
bark.NewLoggerFromLogrus(log.New()),
&mockM3Client{},
&mockM3Client{},
&mockNotifier{},
make(chan int32, 8),
make(chan string, 8),
128,
cgCache)
}

func (s *MessageCacheSuite) TearDownTest() {
}

func (s *MessageCacheSuite) TestTimerQueueCleanupAfterRedelivery() {

ackID := 101
ackIDMap := make(map[string]struct{})

for i := 0; i < 100; i++ {

ackIDStr := strconv.Itoa(ackID)

cMsg := cacheMsg{
connID: 99,
msg: &cherami.ConsumerMessage{
EnqueueTimeUtc: common.Int64Ptr(int64(time.Now().UnixNano())),
AckId: &ackIDStr,
Lsn: common.Int64Ptr(87666),
Address: common.Int64Ptr(88888),
Payload: &cherami.PutMessage{
ID: common.StringPtr("64"),
Data: []byte("abc"),
},
},
}

cMsg.msg.AckId = common.StringPtr(ackIDStr)
s.msgCache.utilHandleDeliveredMsg(cMsg, nil)
ackIDMap[ackIDStr] = struct{}{}
ackID++
}

time.Sleep(time.Second)

s.msgCache.utilHandleRedeliveryTicker(nil)
s.Equal(100, len(s.msgRedeliveryCh), "Unexpected message cache redelivery")

for i := 0; i < 100; i++ {
m := <-s.msgRedeliveryCh
_, ok := ackIDMap[m.GetAckId()]
s.True(ok, "message cache redelivery for unknown ackID %v", m.GetAckId())
delete(ackIDMap, m.GetAckId())
}

s.msgCache.utilHandleRedeliveryTicker(nil)
s.Equal(0, len(s.msgRedeliveryCh), "Unexpected message cache redelivery")
}

// mocks go here

type mockNotifier struct{}

func (m *mockNotifier) Register(id int, notifyCh chan NotifyMsg) {}
func (m *mockNotifier) Unregister(id int) {}
func (m *mockNotifier) Notify(id int, notifyMsg NotifyMsg) {}

type mockM3Client struct{}
type mockStopwatch struct{}

func (m *mockM3Client) IncCounter(op int, idx int) {}
func (m *mockM3Client) AddCounter(op int, idx int, delta int64) {}
func (m *mockM3Client) UpdateGauge(op int, idx int, delta int64) {}
func (m *mockM3Client) RecordTimer(op int, idx int, delta time.Duration) {}
func (m *mockM3Client) StartTimer(scopeIdx int, timerIdx int) metrics.Stopwatch {
watch := &mockStopwatch{}
return watch
}
func (m *mockM3Client) GetParentReporter() metrics.Reporter { return nil }
func (m *mockStopwatch) Stop() time.Duration {
return time.Second
}

0 comments on commit 41cb432

Please sign in to comment.