Skip to content

Commit

Permalink
Merge remote-tracking branch 'nickdevp/master' into ISSU-341
Browse files Browse the repository at this point in the history
  • Loading branch information
Nick Desai committed May 15, 2018
2 parents 5132e66 + fecab1e commit d8f77cc
Show file tree
Hide file tree
Showing 5 changed files with 28 additions and 10 deletions.
3 changes: 1 addition & 2 deletions .travis.yml
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,6 @@ before_script:
- go get honnef.co/go/tools/cmd/megacheck # Badass static analyzer/linter
- go get github.com/fzipp/gocyclo
- go get github.com/mattn/goveralls
- go get github.com/goreleaser/goreleaser

# script always run to completion (set +e). All of these code checks are must haves
# in a modern Go project.
Expand All @@ -66,4 +65,4 @@ script:
# goreleaser will run if the latest version tag matches the current commit
after_success:
- $GOPATH/bin/goveralls -coverprofile=profile.cov -service=travis-ci
- if [[ "$TRAVIS_SECURE_ENV_VARS" == true && "$TRAVIS_GO_VERSION" == "1.9.2" ]]; then docker login -u "$DOCKER_USERNAME" -p "$DOCKER_PASSWORD"; goreleaser; fi
- if [[ "$TRAVIS_SECURE_ENV_VARS" == true && "$TRAVIS_GO_VERSION" == "1.9.2" ]]; then docker login -u "$DOCKER_USERNAME" -p "$DOCKER_PASSWORD"; curl -sL https://git.io/goreleaser | bash; fi
6 changes: 3 additions & 3 deletions config/default-email.tmpl
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,8 @@ The Kafka consumer groups you are monitoring are currently showing problems. The

Cluster: {{.Result.Cluster}}
Group: {{.Result.Group}}
Status: {{if eq 1 .Result.Status}}OK{{else if eq 1 .Result.Status}}WARNING{{else if eq 2 .Result.Status}}ERROR{{end}}
Status: {{.Result.Status.String}}
Complete: {{.Result.Complete}}
Errors: {{len .Result.Partitions}} partitions have problems
{{range .Result.Partitions}} {{if eq 1 .Status}} OK{{if eq 2 .Status}} WARN{{else if eq 2 .Status}} ERR{{else if eq 3 .Status}} STOP{{else if eq 4 .Status}}STALL{{end}} {{.Topic}}:{{.Partition}} ({{.Start.Timestamp}}, {{.Start.Offset}}, {{.Start.Lag}}) -> ({{.End.Timestamp}}, {{.End.Offset}}, {{.End.Lag}})
{{end}}{{end}}
{{range .Result.Partitions}} {{.Status.String}} {{.Topic}}:{{.Partition}} ({{.Start.Timestamp}}, {{.Start.Offset}}, {{.Start.Lag}}) -> ({{.End.Timestamp}}, {{.End.Offset}}, {{.End.Lag}})
{{end}}
16 changes: 11 additions & 5 deletions core/internal/evaluator/caching.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,8 +33,9 @@ type CachingEvaluator struct {
// fields that are appropriate to identify this coordinator
Log *zap.Logger

name string
expireCache int
name string
expireCache int
minimumComplete float32

RequestChannel chan *protocol.EvaluatorRequest
running sync.WaitGroup
Expand Down Expand Up @@ -63,6 +64,7 @@ func (module *CachingEvaluator) Configure(name string, configRoot string) {
// Set defaults for configs if needed
viper.SetDefault(configRoot+".expire-cache", 10)
module.expireCache = viper.GetInt(configRoot + ".expire-cache")
module.minimumComplete = float32(viper.GetFloat64(configRoot + ".minimum-complete"))
cacheExpire := time.Duration(module.expireCache) * time.Second

newCache, err := goswarm.NewSimple(&goswarm.Config{
Expand Down Expand Up @@ -221,7 +223,7 @@ func (module *CachingEvaluator) evaluateConsumerStatus(clusterAndConsumer string
completePartitions := 0
for topic, partitions := range topics {
for partitionID, partition := range partitions {
partitionStatus := evaluatePartitionStatus(partition)
partitionStatus := evaluatePartitionStatus(partition, module.minimumComplete)
partitionStatus.Topic = topic
partitionStatus.Partition = int32(partitionID)
partitionStatus.Owner = partition.Owner
Expand Down Expand Up @@ -260,7 +262,7 @@ func (module *CachingEvaluator) evaluateConsumerStatus(clusterAndConsumer string
return status, nil
}

func evaluatePartitionStatus(partition *protocol.ConsumerPartition) *protocol.PartitionStatus {
func evaluatePartitionStatus(partition *protocol.ConsumerPartition, minimumComplete float32) *protocol.PartitionStatus {
status := &protocol.PartitionStatus{
Status: protocol.StatusOK,
CurrentLag: partition.CurrentLag,
Expand Down Expand Up @@ -295,7 +297,11 @@ func evaluatePartitionStatus(partition *protocol.ConsumerPartition) *protocol.Pa
status.Start = offsets[0]
status.End = offsets[len(offsets)-1]

status.Status = calculatePartitionStatus(offsets, partition.BrokerOffsets, partition.CurrentLag, time.Now().Unix())
// If the partition does not meet the completeness threshold, just return it as OK
if status.Complete >= minimumComplete {
status.Status = calculatePartitionStatus(offsets, partition.BrokerOffsets, partition.CurrentLag, time.Now().Unix())
}

return status
}

Expand Down
4 changes: 4 additions & 0 deletions core/internal/notifier/http.go
Original file line number Diff line number Diff line change
Expand Up @@ -175,6 +175,10 @@ func (module *HTTPNotifier) Notify(status *protocol.ConsumerGroupStatus, eventID
}
req.Header.Set("Content-Type", "application/json")

for header, value := range viper.GetStringMapString("notifier." + module.name + ".headers") {
req.Header.Set(header, value)
}

resp, err := module.httpClient.Do(req)
if err != nil {
logger.Error("failed to send", zap.Error(err))
Expand Down
9 changes: 9 additions & 0 deletions core/internal/notifier/http_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ func fixtureHTTPNotifier() *HTTPNotifier {
viper.Set("notifier.test.template-open", "template_open")
viper.Set("notifier.test.template-close", "template_close")
viper.Set("notifier.test.send-close", false)
viper.Set("notifier.test.headers", map[string]string{"Token": "testtoken"})

return &module
}
Expand Down Expand Up @@ -91,6 +92,10 @@ func TestHttpNotifier_Notify_Open(t *testing.T) {
assert.Len(t, headers, 1, "Expected to receive exactly one Content-Type header")
assert.Equalf(t, "application/json", headers[0], "Expected Content-Type header to be 'application/json', not '%v'", headers[0])

tokenHeaders, ok := r.Header["Token"]
assert.True(t, ok, "Expected to receive Token header")
assert.Equalf(t, "testtoken", tokenHeaders[0], "Expected Token header to be 'testtoken', not '%v'", tokenHeaders[0])

decoder := json.NewDecoder(r.Body)
var req HTTPRequest
err := decoder.Decode(&req)
Expand Down Expand Up @@ -138,6 +143,10 @@ func TestHttpNotifier_Notify_Close(t *testing.T) {
assert.Len(t, headers, 1, "Expected to receive exactly one Content-Type header")
assert.Equalf(t, "application/json", headers[0], "Expected Content-Type header to be 'application/json', not '%v'", headers[0])

tokenHeaders, ok := r.Header["Token"]
assert.True(t, ok, "Expected to receive Token header")
assert.Equalf(t, "testtoken", tokenHeaders[0], "Expected Token header to be 'testtoken', not '%v'", tokenHeaders[0])

decoder := json.NewDecoder(r.Body)
var req HTTPRequest
err := decoder.Decode(&req)
Expand Down

0 comments on commit d8f77cc

Please sign in to comment.