Skip to content
This repository has been archived by the owner on Feb 18, 2021. It is now read-only.

Make sure dlq extent quotas are honored #33

Merged
merged 5 commits into from
Jan 25, 2017
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 3 additions & 4 deletions services/controllerhost/api_handlers.go
Original file line number Diff line number Diff line change
Expand Up @@ -210,13 +210,12 @@ func getDstType(desc *shared.DestinationDescription) dstType {
dstType := desc.GetType()
switch dstType {
case shared.DestinationType_PLAIN:
return dstTypePlain
case shared.DestinationType_TIMER:
return dstTypeTimer
default:
if common.PathDLQRegex.MatchString(desc.GetPath()) {
return dstTypeDLQ
}
return dstTypePlain
case shared.DestinationType_TIMER:
return dstTypeTimer
}
return dstTypePlain
}
Expand Down
13 changes: 7 additions & 6 deletions services/controllerhost/consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -351,7 +351,7 @@ func fetchClassifyOpenCGExtents(context *Context, dstUUID string, cgUUID string,
// merges their dlq to their normal destination. When this
// happens, the customer expectation is to start seeing
// messages from the merge operation immediately.
// (2) Avoid all consumed extens being DLQ extents.
// (2) Avoid all consumed extents being DLQ extents.
// This is because, a merge could potentially bring in
// a lot of dlq extents and in case, these are poison
// pills, the customer will make no progress w.r.t their
Expand Down Expand Up @@ -425,12 +425,13 @@ func selectNextExtentsToConsume(
// capacity is the target number of cgextents to achieve
capacity := maxExtentsToConsumeForDst(context, dstDesc.GetPath(), cgDesc.GetConsumerGroupName(), getDstType(dstDesc), dstDesc.GetZoneConfigs())
dlqQuota := common.MaxInt(1, capacity/4)
nAvailable := len(dstDlqExtents) + dstExtentsCount
dlqQuota = common.MaxInt(0, dlqQuota-nCGDlqExtents)

capacity = common.MaxInt(0, capacity-len(cgExtents.openHealthy))
capacity = common.MinInt(capacity, nAvailable)
nAvailable := dstExtentsCount + len(dstDlqExtents)
nConsumable := dstExtentsCount + common.MinInt(dlqQuota, len(dstDlqExtents))

dlqQuota = common.MaxInt(0, dlqQuota-nCGDlqExtents)
capacity = common.MaxInt(0, capacity-len(cgExtents.openHealthy))
capacity = common.MinInt(capacity, nConsumable)

if capacity == 0 {
if nCGDlqExtents == 0 && len(dstDlqExtents) > 0 {
Expand All @@ -453,7 +454,7 @@ func selectNextExtentsToConsume(

for i := 0; i < capacity; i++ {
if remDstDlqExtents > 0 {
if nDstDlqExtents < dlqQuota || remDstExtents == 0 {
if nDstDlqExtents < dlqQuota {
result[i] = dstDlqExtents[nDstDlqExtents]
nDstDlqExtents++
remDstDlqExtents--
Expand Down
19 changes: 11 additions & 8 deletions services/controllerhost/consumer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -217,6 +217,7 @@ func (s *McpSuite) TestCGExtentSelectorHonorsDlqQuota() {
for _, ext := range gotExtents {
if _, ok := dlqExtents[ext.GetExtentUUID()]; ok {
nDlq++
cgExtents.open[ext.GetExtentUUID()] = struct{}{}
}
}

Expand All @@ -231,10 +232,7 @@ func (s *McpSuite) TestCGExtentSelectorHonorsDlqQuota() {

gotExtents, avail, err = selectNextExtentsToConsume(context, dstDesc, cgDesc, cgExtents, metrics.GetOutputHostsScope)
s.Nil(err, "selectNextExtentsToConsume() error")
for _, ext := range gotExtents {
_, ok := dlqExtents[ext.GetExtentUUID()]
s.True(ok, "DLQ extent added, when there is room and there is no other extent available")
}
s.Equal(0, len(gotExtents), "DLQ quota not strictly honored")
}

func (s *McpSuite) TestCGExtentSelectorHonorsRemoteExtent() {
Expand Down Expand Up @@ -332,6 +330,7 @@ func (s *McpSuite) TestCGExtentSelectorWithBacklog() {
}

cgExtents := newCGExtentsByCategory()
openDLQExtents := make(map[string]struct{})
dlqExtsAvail, dlqExtsAssigned := len(dlqExtents), 0
openExtsAvail, openExtsAssigned := len(openExtents), 0

Expand All @@ -341,13 +340,14 @@ func (s *McpSuite) TestCGExtentSelectorWithBacklog() {

gotExtents, avail, err1 := selectNextExtentsToConsume(context, dstDesc, cgDesc, cgExtents, metrics.GetOutputHostsScope)
s.Nil(err1, "selectNextExtentsToConsume() error")
expectedCount := common.MinInt(maxExtentsToConsumeForDstPlain, totalAvail)

dlqQuota := common.MinInt(dlqExtsAvail, maxExtentsToConsumeForDstPlain/4-len(openDLQExtents))
dlqQuota = common.MaxInt(0, dlqQuota)

expectedCount := common.MinInt(maxExtentsToConsumeForDstPlain, (openExtsAvail + dlqQuota))
s.Equal(expectedCount, len(gotExtents), "Wrong number of next extents to consume")
s.Equal(totalAvail, avail, "Wrong number of available extents")

dlqQuota := common.MinInt(maxExtentsToConsumeForDstPlain/4, dlqExtsAvail)
dlqQuota = common.MaxInt(0, dlqQuota-dlqExtsAssigned)

for _, ext := range gotExtents {

extID := ext.GetExtentUUID()
Expand All @@ -358,6 +358,7 @@ func (s *McpSuite) TestCGExtentSelectorWithBacklog() {
// for DLQ extents. So, this will appear first
s.Equal(dlqExtents[dlqExtsAssigned], extID, "Incorrect extent serving order")
cgExtents.open[extID] = struct{}{}
openDLQExtents[extID] = struct{}{}
dlqExtsAssigned++
dlqExtsAvail--
dlqQuota--
Expand All @@ -372,6 +373,7 @@ func (s *McpSuite) TestCGExtentSelectorWithBacklog() {
// If there are still dlq extents left, then should show up next
s.Equal(dlqExtents[dlqExtsAssigned], extID, "Incorrect extent serving order")
cgExtents.open[extID] = struct{}{}
openDLQExtents[extID] = struct{}{}
dlqExtsAssigned++
dlqExtsAvail--
}
Expand All @@ -380,6 +382,7 @@ func (s *McpSuite) TestCGExtentSelectorWithBacklog() {
target := gotExtents[0].GetExtentUUID()
s.updateCGExtentStatus(cgDesc.GetConsumerGroupUUID(), target, m.ConsumerGroupExtentStatus_CONSUMED)
delete(cgExtents.open, target)
delete(openDLQExtents, target)
cgExtents.consumed[target] = struct{}{}
}
}
20 changes: 20 additions & 0 deletions services/controllerhost/controllerhost_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -870,3 +870,23 @@ func (s *McpSuite) TestCreateRemoteZoneExtent() {
}
s.True(primaryValid)
}

func (s *McpSuite) TestGetDstType() {

dstType := shared.DestinationType_PLAIN

dstDesc := &shared.DestinationDescription{
Path: common.StringPtr("/unit/desttype"),
DestinationUUID: common.StringPtr(uuid.New()),
Type: &dstType,
}

s.Equal(dstTypePlain, getDstType(dstDesc), "getDstType(PLAIN) failed")
dstType = shared.DestinationType_TIMER
s.Equal(dstTypeTimer, getDstType(dstDesc), "getDstType(TIMER) failed")

// test dlq
dstType = shared.DestinationType_PLAIN
dstDesc.Path = common.StringPtr("/unit/desttype.dlq")
s.Equal(dstTypeDLQ, getDstType(dstDesc), "getDstType(TIMER) failed")
}