Skip to content

Commit

Permalink
Fix stopped partition detection when there was a zero lag commit (#292)
Browse files Browse the repository at this point in the history
Closes #290
  • Loading branch information
toddpalino committed Dec 3, 2017
1 parent 95fcb84 commit 89f072a
Show file tree
Hide file tree
Showing 2 changed files with 39 additions and 15 deletions.
27 changes: 16 additions & 11 deletions core/internal/evaluator/caching.go
Original file line number Diff line number Diff line change
Expand Up @@ -300,20 +300,25 @@ func evaluatePartitionStatus(partition *protocol.ConsumerPartition) *protocol.Pa
}

func calculatePartitionStatus(offsets []*protocol.ConsumerOffset, currentLag uint64, timeNow int64) protocol.StatusConstant {
// First check if the lag was zero at any point, and skip the rest of the checks if this is true
if (currentLag > 0) && isLagAlwaysNotZero(offsets) {
// Check for errors, in order of severity starting with the worst. If any check comes back true, skip the rest
if checkIfOffsetsRewind(offsets) {
return protocol.StatusRewind
}
// If the current lag is zero, the partition is never in error
if currentLag > 0 {
// Check if the partition is stopped first, as this is a problem even if the consumer had zero lag at some point
if checkIfOffsetsStopped(offsets, timeNow) {
return protocol.StatusStop
}
if checkIfOffsetsStalled(offsets) {
return protocol.StatusStall
}
if checkIfLagNotDecreasing(offsets) {
return protocol.StatusWarning

// Now check if the lag was zero at any point, and skip the rest of the checks if this is true
if isLagAlwaysNotZero(offsets) {
// Check for errors, in order of severity starting with the worst. If any check comes back true, skip the rest
if checkIfOffsetsRewind(offsets) {
return protocol.StatusRewind
}
if checkIfOffsetsStalled(offsets) {
return protocol.StatusStall
}
if checkIfLagNotDecreasing(offsets) {
return protocol.StatusWarning
}
}
}
return protocol.StatusOK
Expand Down
27 changes: 23 additions & 4 deletions core/internal/evaluator/caching_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -211,16 +211,16 @@ var tests = []testset{
{5000, 500000, 200},
}, 200, 900, false, false, false, false, true, protocol.StatusOK},

// 2 - same status, even though stop is now true, because we still have zero lag at the start
// 2 - status is now STOP because the time since last commit is great enough (500s), even though lag is zero at the start (fixed due to #290)
{[]*protocol.ConsumerOffset{
{1000, 100000, 0},
{2000, 200000, 50},
{3000, 300000, 100},
{4000, 400000, 150},
{5000, 500000, 200},
}, 200, 1000, false, false, true, false, true, protocol.StatusOK},
}, 200, 1000, false, false, true, false, true, protocol.StatusStop},

// 3 - status is STOP now because lag is always non-zero
// 3 - status is STOP when lag is always non-zero as well
{[]*protocol.ConsumerOffset{
{1000, 100000, 50},
{2000, 200000, 100},
Expand All @@ -229,7 +229,7 @@ var tests = []testset{
{5000, 500000, 250},
}, 250, 1000, true, false, true, false, true, protocol.StatusStop},

// 4 - status is OK because of zero lag, but stall is true because the offset is always the same and there is lag
// 4 - status is OK because of zero lag, but stall is true because the offset is always the same and there is lag (another commit with turn this to stall)
{[]*protocol.ConsumerOffset{
{1000, 100000, 0},
{1000, 200000, 50},
Expand Down Expand Up @@ -273,6 +273,25 @@ var tests = []testset{
{4000, 400000, 200},
{5000, 500000, 250},
}, 0, 1000, true, false, true, false, true, protocol.StatusOK},

// 9 - status is STOP due to timestamps because the current lag is non-zero, even though lag is always zero in offsets (#290)
{[]*protocol.ConsumerOffset{
{792748079, 1512224618356, 0},
{792748080, 1512224619362, 0},
{792748081, 1512224620366, 0},
{792748082, 1512224621367, 0},
{792748083, 1512224622370, 0},
{792748084, 1512224623373, 0},
{792748085, 1512224624378, 0},
{792748086, 1512224625379, 0},
{792748087, 1512224626383, 0},
{792748088, 1512224627383, 0},
{792748089, 1512224628383, 0},
{792748090, 1512224629388, 0},
{792748091, 1512224630391, 0},
{792748092, 1512224631394, 0},
{792748093, 1512224632397, 0},
}, 931, 1512224650, false, false, true, false, true, protocol.StatusStop},
}

func TestCachingEvaluator_CheckRules(t *testing.T) {
Expand Down

0 comments on commit 89f072a

Please sign in to comment.