Skip to content
This repository has been archived by the owner on Feb 18, 2021. It is now read-only.

Commit

Permalink
Make sure dlq extent quotas are honored (#33)
Browse files Browse the repository at this point in the history
* Make sure dlq extent quotas are honored

* fixed typos in unit test
  • Loading branch information
venkat1109 committed Jan 25, 2017
1 parent 026daaa commit 49808c2
Show file tree
Hide file tree
Showing 4 changed files with 41 additions and 18 deletions.
7 changes: 3 additions & 4 deletions services/controllerhost/api_handlers.go
Expand Up @@ -210,13 +210,12 @@ func getDstType(desc *shared.DestinationDescription) dstType {
dstType := desc.GetType()
switch dstType {
case shared.DestinationType_PLAIN:
return dstTypePlain
case shared.DestinationType_TIMER:
return dstTypeTimer
default:
if common.PathDLQRegex.MatchString(desc.GetPath()) {
return dstTypeDLQ
}
return dstTypePlain
case shared.DestinationType_TIMER:
return dstTypeTimer
}
return dstTypePlain
}
Expand Down
13 changes: 7 additions & 6 deletions services/controllerhost/consumer.go
Expand Up @@ -351,7 +351,7 @@ func fetchClassifyOpenCGExtents(context *Context, dstUUID string, cgUUID string,
// merges their dlq to their normal destination. When this
// happens, the customer expectation is to start seeing
// messages from the merge operation immediately.
// (2) Avoid all consumed extens being DLQ extents.
// (2) Avoid all consumed extents being DLQ extents.
// This is because, a merge could potentially bring in
// a lot of dlq extents and in case, these are poison
// pills, the customer will make no progress w.r.t their
Expand Down Expand Up @@ -425,12 +425,13 @@ func selectNextExtentsToConsume(
// capacity is the target number of cgextents to achieve
capacity := maxExtentsToConsumeForDst(context, dstDesc.GetPath(), cgDesc.GetConsumerGroupName(), getDstType(dstDesc), dstDesc.GetZoneConfigs())
dlqQuota := common.MaxInt(1, capacity/4)
nAvailable := len(dstDlqExtents) + dstExtentsCount
dlqQuota = common.MaxInt(0, dlqQuota-nCGDlqExtents)

capacity = common.MaxInt(0, capacity-len(cgExtents.openHealthy))
capacity = common.MinInt(capacity, nAvailable)
nAvailable := dstExtentsCount + len(dstDlqExtents)
nConsumable := dstExtentsCount + common.MinInt(dlqQuota, len(dstDlqExtents))

dlqQuota = common.MaxInt(0, dlqQuota-nCGDlqExtents)
capacity = common.MaxInt(0, capacity-len(cgExtents.openHealthy))
capacity = common.MinInt(capacity, nConsumable)

if capacity == 0 {
if nCGDlqExtents == 0 && len(dstDlqExtents) > 0 {
Expand All @@ -453,7 +454,7 @@ func selectNextExtentsToConsume(

for i := 0; i < capacity; i++ {
if remDstDlqExtents > 0 {
if nDstDlqExtents < dlqQuota || remDstExtents == 0 {
if nDstDlqExtents < dlqQuota {
result[i] = dstDlqExtents[nDstDlqExtents]
nDstDlqExtents++
remDstDlqExtents--
Expand Down
19 changes: 11 additions & 8 deletions services/controllerhost/consumer_test.go
Expand Up @@ -217,6 +217,7 @@ func (s *McpSuite) TestCGExtentSelectorHonorsDlqQuota() {
for _, ext := range gotExtents {
if _, ok := dlqExtents[ext.GetExtentUUID()]; ok {
nDlq++
cgExtents.open[ext.GetExtentUUID()] = struct{}{}
}
}

Expand All @@ -231,10 +232,7 @@ func (s *McpSuite) TestCGExtentSelectorHonorsDlqQuota() {

gotExtents, avail, err = selectNextExtentsToConsume(context, dstDesc, cgDesc, cgExtents, metrics.GetOutputHostsScope)
s.Nil(err, "selectNextExtentsToConsume() error")
for _, ext := range gotExtents {
_, ok := dlqExtents[ext.GetExtentUUID()]
s.True(ok, "DLQ extent added, when there is room and there is no other extent available")
}
s.Equal(0, len(gotExtents), "DLQ quota not strictly honored")
}

func (s *McpSuite) TestCGExtentSelectorHonorsRemoteExtent() {
Expand Down Expand Up @@ -332,6 +330,7 @@ func (s *McpSuite) TestCGExtentSelectorWithBacklog() {
}

cgExtents := newCGExtentsByCategory()
openDLQExtents := make(map[string]struct{})
dlqExtsAvail, dlqExtsAssigned := len(dlqExtents), 0
openExtsAvail, openExtsAssigned := len(openExtents), 0

Expand All @@ -341,13 +340,14 @@ func (s *McpSuite) TestCGExtentSelectorWithBacklog() {

gotExtents, avail, err1 := selectNextExtentsToConsume(context, dstDesc, cgDesc, cgExtents, metrics.GetOutputHostsScope)
s.Nil(err1, "selectNextExtentsToConsume() error")
expectedCount := common.MinInt(maxExtentsToConsumeForDstPlain, totalAvail)

dlqQuota := common.MinInt(dlqExtsAvail, maxExtentsToConsumeForDstPlain/4-len(openDLQExtents))
dlqQuota = common.MaxInt(0, dlqQuota)

expectedCount := common.MinInt(maxExtentsToConsumeForDstPlain, (openExtsAvail + dlqQuota))
s.Equal(expectedCount, len(gotExtents), "Wrong number of next extents to consume")
s.Equal(totalAvail, avail, "Wrong number of available extents")

dlqQuota := common.MinInt(maxExtentsToConsumeForDstPlain/4, dlqExtsAvail)
dlqQuota = common.MaxInt(0, dlqQuota-dlqExtsAssigned)

for _, ext := range gotExtents {

extID := ext.GetExtentUUID()
Expand All @@ -358,6 +358,7 @@ func (s *McpSuite) TestCGExtentSelectorWithBacklog() {
// for DLQ extents. So, this will appear first
s.Equal(dlqExtents[dlqExtsAssigned], extID, "Incorrect extent serving order")
cgExtents.open[extID] = struct{}{}
openDLQExtents[extID] = struct{}{}
dlqExtsAssigned++
dlqExtsAvail--
dlqQuota--
Expand All @@ -372,6 +373,7 @@ func (s *McpSuite) TestCGExtentSelectorWithBacklog() {
// If there are still dlq extents left, then should show up next
s.Equal(dlqExtents[dlqExtsAssigned], extID, "Incorrect extent serving order")
cgExtents.open[extID] = struct{}{}
openDLQExtents[extID] = struct{}{}
dlqExtsAssigned++
dlqExtsAvail--
}
Expand All @@ -380,6 +382,7 @@ func (s *McpSuite) TestCGExtentSelectorWithBacklog() {
target := gotExtents[0].GetExtentUUID()
s.updateCGExtentStatus(cgDesc.GetConsumerGroupUUID(), target, m.ConsumerGroupExtentStatus_CONSUMED)
delete(cgExtents.open, target)
delete(openDLQExtents, target)
cgExtents.consumed[target] = struct{}{}
}
}
20 changes: 20 additions & 0 deletions services/controllerhost/controllerhost_test.go
Expand Up @@ -870,3 +870,23 @@ func (s *McpSuite) TestCreateRemoteZoneExtent() {
}
s.True(primaryValid)
}

func (s *McpSuite) TestGetDstType() {

dstType := shared.DestinationType_PLAIN

dstDesc := &shared.DestinationDescription{
Path: common.StringPtr("/unit/desttype"),
DestinationUUID: common.StringPtr(uuid.New()),
Type: &dstType,
}

s.Equal(dstTypePlain, getDstType(dstDesc), "getDstType(PLAIN) failed")
dstType = shared.DestinationType_TIMER
s.Equal(dstTypeTimer, getDstType(dstDesc), "getDstType(TIMER) failed")

// test dlq
dstType = shared.DestinationType_PLAIN
dstDesc.Path = common.StringPtr("/unit/desttype.dlq")
s.Equal(dstTypeDLQ, getDstType(dstDesc), "getDstType(TIMER) failed")
}

0 comments on commit 49808c2

Please sign in to comment.