Skip to content
This repository has been archived by the owner on Feb 18, 2021. It is now read-only.

Commit

Permalink
Make sure dlq extent quotas are honored
Browse files Browse the repository at this point in the history
  • Loading branch information
venkat1109 committed Jan 23, 2017
1 parent f0355ed commit 2c68e93
Show file tree
Hide file tree
Showing 4 changed files with 55 additions and 32 deletions.
15 changes: 7 additions & 8 deletions services/controllerhost/api_handlers.go
Expand Up @@ -23,12 +23,12 @@ package controllerhost
import (
"time"

m "github.com/uber/cherami-thrift/.generated/go/metadata"
"github.com/uber/cherami-thrift/.generated/go/shared"
"github.com/uber/cherami-server/common"
"github.com/uber/cherami-server/common/metrics"
"github.com/pborman/uuid"
"github.com/uber-common/bark"
"github.com/uber/cherami-server/common"
"github.com/uber/cherami-server/common/metrics"
m "github.com/uber/cherami-thrift/.generated/go/metadata"
"github.com/uber/cherami-thrift/.generated/go/shared"
"github.com/uber/tchannel-go/thrift"
)

Expand Down Expand Up @@ -215,13 +215,12 @@ func getDstType(desc *shared.DestinationDescription) dstType {
dstType := desc.GetType()
switch dstType {
case shared.DestinationType_PLAIN:
return dstTypePlain
case shared.DestinationType_TIMER:
return dstTypeTimer
default:
if common.PathDLQRegex.MatchString(desc.GetPath()) {
return dstTypeDLQ
}
return dstTypePlain
case shared.DestinationType_TIMER:
return dstTypeTimer
}
return dstTypePlain
}
Expand Down
19 changes: 10 additions & 9 deletions services/controllerhost/consumer.go
Expand Up @@ -23,12 +23,12 @@ package controllerhost
import (
"time"

"github.com/uber-common/bark"
"github.com/uber/cherami-server/common"
"github.com/uber/cherami-server/common/metrics"
a "github.com/uber/cherami-thrift/.generated/go/admin"
m "github.com/uber/cherami-thrift/.generated/go/metadata"
"github.com/uber/cherami-thrift/.generated/go/shared"
"github.com/uber/cherami-server/common"
"github.com/uber/cherami-server/common/metrics"
"github.com/uber-common/bark"
)

const failBackoffInterval = int64(time.Millisecond * 100)
Expand Down Expand Up @@ -327,7 +327,7 @@ func fetchClassifyOpenCGExtents(context *Context, dstUUID string, cgUUID string,
// merges their dlq to their normal destination. When this
// happens, the customer expectation is to start seeing
// messages from the merge operation immediately.
// (2) Avoid all consumed extens being DLQ extents.
// (2) Avoid all consumed extents being DLQ extents.
// This is because, a merge could potentially bring in
// a lot of dlq extents and in case, these are poison
// pills, the customer will make no progress w.r.t their
Expand Down Expand Up @@ -401,12 +401,13 @@ func selectNextExtentsToConsume(
// capacity is the target number of cgextents to achieve
capacity := maxExtentsToConsumeForDstType(getDstType(dstDesc), dstDesc.GetZoneConfigs())
dlqQuota := common.MaxInt(1, capacity/4)
nAvailable := len(dstDlqExtents) + dstExtentsCount
dlqQuota = common.MaxInt(0, dlqQuota-nCGDlqExtents)

capacity = common.MaxInt(0, capacity-len(cgExtents.openHealthy))
capacity = common.MinInt(capacity, nAvailable)
nAvailable := dstExtentsCount + len(dstDlqExtents)
nConsumable := dstExtentsCount + common.MinInt(dlqQuota, len(dstDlqExtents))

dlqQuota = common.MaxInt(0, dlqQuota-nCGDlqExtents)
capacity = common.MaxInt(0, capacity-len(cgExtents.openHealthy))
capacity = common.MinInt(capacity, nConsumable)

if capacity == 0 {
if nCGDlqExtents == 0 && len(dstDlqExtents) > 0 {
Expand All @@ -429,7 +430,7 @@ func selectNextExtentsToConsume(

for i := 0; i < capacity; i++ {
if remDstDlqExtents > 0 {
if nDstDlqExtents < dlqQuota || remDstExtents == 0 {
if nDstDlqExtents < dlqQuota {
result[i] = dstDlqExtents[nDstDlqExtents]
nDstDlqExtents++
remDstDlqExtents--
Expand Down
25 changes: 14 additions & 11 deletions services/controllerhost/consumer_test.go
Expand Up @@ -23,11 +23,11 @@ package controllerhost
import (
"time"

m "github.com/uber/cherami-thrift/.generated/go/metadata"
"github.com/uber/cherami-thrift/.generated/go/shared"
"github.com/pborman/uuid"
"github.com/uber/cherami-server/common"
"github.com/uber/cherami-server/common/metrics"
"github.com/pborman/uuid"
m "github.com/uber/cherami-thrift/.generated/go/metadata"
"github.com/uber/cherami-thrift/.generated/go/shared"
)

func (s *McpSuite) TestCGExtentSelectorWithNoExtents() {
Expand Down Expand Up @@ -212,6 +212,7 @@ func (s *McpSuite) TestCGExtentSelectorHonorsDlqQuota() {
for _, ext := range gotExtents {
if _, ok := dlqExtents[ext.GetExtentUUID()]; ok {
nDlq++
cgExtents.open[ext.GetExtentUUID()] = struct{}{}
}
}

Expand All @@ -226,10 +227,7 @@ func (s *McpSuite) TestCGExtentSelectorHonorsDlqQuota() {

gotExtents, avail, err = selectNextExtentsToConsume(context, dstDesc, cgDesc, cgExtents, metrics.GetOutputHostsScope)
s.Nil(err, "selectNextExtentsToConsume() error")
for _, ext := range gotExtents {
_, ok := dlqExtents[ext.GetExtentUUID()]
s.True(ok, "DLQ extent added, when there is room and there is no other extent available")
}
s.Equal(0, len(gotExtents), "DLQ quota not strictly honored")
}

func (s *McpSuite) TestCGExtentSelectorHonorsRemoteExtent() {
Expand Down Expand Up @@ -327,6 +325,7 @@ func (s *McpSuite) TestCGExtentSelectorWithBacklog() {
}

cgExtents := newCGExtentsByCategory()
openDLQExtents := make(map[string]struct{})
dlqExtsAvail, dlqExtsAssigned := len(dlqExtents), 0
openExtsAvail, openExtsAssigned := len(openExtents), 0

Expand All @@ -336,13 +335,14 @@ func (s *McpSuite) TestCGExtentSelectorWithBacklog() {

gotExtents, avail, err1 := selectNextExtentsToConsume(context, dstDesc, cgDesc, cgExtents, metrics.GetOutputHostsScope)
s.Nil(err1, "selectNextExtentsToConsume() error")
expectedCount := common.MinInt(maxExtentsToConsumeForDstPlain, totalAvail)

dlqQuota := common.MinInt(dlqExtsAvail, maxExtentsToConsumeForDstPlain/4-len(openDLQExtents))
dlqQuota = common.MaxInt(0, dlqQuota)

expectedCount := common.MinInt(maxExtentsToConsumeForDstPlain, (openExtsAvail + dlqQuota))
s.Equal(expectedCount, len(gotExtents), "Wrong number of next extents to consume")
s.Equal(totalAvail, avail, "Wrong number of available extents")

dlqQuota := common.MinInt(maxExtentsToConsumeForDstPlain/4, dlqExtsAvail)
dlqQuota = common.MaxInt(0, dlqQuota-dlqExtsAssigned)

for _, ext := range gotExtents {

extID := ext.GetExtentUUID()
Expand All @@ -353,6 +353,7 @@ func (s *McpSuite) TestCGExtentSelectorWithBacklog() {
// for DLQ extents. So, this will appear first
s.Equal(dlqExtents[dlqExtsAssigned], extID, "Incorrect extent serving order")
cgExtents.open[extID] = struct{}{}
openDLQExtents[extID] = struct{}{}
dlqExtsAssigned++
dlqExtsAvail--
dlqQuota--
Expand All @@ -367,6 +368,7 @@ func (s *McpSuite) TestCGExtentSelectorWithBacklog() {
// If there are still dlq extents left, then should show up next
s.Equal(dlqExtents[dlqExtsAssigned], extID, "Incorrect extent serving order")
cgExtents.open[extID] = struct{}{}
openDLQExtents[extID] = struct{}{}
dlqExtsAssigned++
dlqExtsAvail--
}
Expand All @@ -375,6 +377,7 @@ func (s *McpSuite) TestCGExtentSelectorWithBacklog() {
target := gotExtents[0].GetExtentUUID()
s.updateCGExtentStatus(cgDesc.GetConsumerGroupUUID(), target, m.ConsumerGroupExtentStatus_CONSUMED)
delete(cgExtents.open, target)
delete(openDLQExtents, target)
cgExtents.consumed[target] = struct{}{}
}
}
28 changes: 24 additions & 4 deletions services/controllerhost/controllerhost_test.go
Expand Up @@ -29,16 +29,16 @@ import (
"testing"
"time"

c "github.com/uber/cherami-thrift/.generated/go/controller"
m "github.com/uber/cherami-thrift/.generated/go/metadata"
"github.com/uber/cherami-thrift/.generated/go/shared"
mc "github.com/uber/cherami-server/clients/metadata"
"github.com/uber/cherami-server/common"
"github.com/uber/cherami-server/common/configure"
dconfig "github.com/uber/cherami-server/common/dconfigclient"
mockcommon "github.com/uber/cherami-server/test/mocks/common"
"github.com/uber/cherami-server/test"
mockcommon "github.com/uber/cherami-server/test/mocks/common"
mockreplicator "github.com/uber/cherami-server/test/mocks/replicator"
c "github.com/uber/cherami-thrift/.generated/go/controller"
m "github.com/uber/cherami-thrift/.generated/go/metadata"
"github.com/uber/cherami-thrift/.generated/go/shared"

"github.com/pborman/uuid"
"github.com/stretchr/testify/assert"
Expand Down Expand Up @@ -870,3 +870,23 @@ func (s *McpSuite) TestCreateRemoteZoneExtent() {
}
s.True(primaryValid)
}

func (s *McpSuite) TestGetDstType() {

dstType := shared.DestinationType_PLAIN

dstDesc := &shared.DestinationDescription{
Path: common.StringPtr("/unit/desttype"),
DestinationUUID: common.StringPtr(uuid.New()),
Type: &dstType,
}

s.Equal(dstTypePlain, getDstType(dstDesc), "getDstTyple(PLAIN) failed")
dstType = shared.DestinationType_TIMER
s.Equal(dstTypeTimer, getDstType(dstDesc), "getDstTyple(TIMER) failed")

// test dlq
dstType = shared.DestinationType_PLAIN
dstDesc.Path = common.StringPtr("/unit/desttype.dlq")
s.Equal(dstTypeDLQ, getDstType(dstDesc), "getDstTyple(TIMER) failed")
}

0 comments on commit 2c68e93

Please sign in to comment.