Skip to content

Commit

Permalink
fix(alert): alert behavior when find old incident
Browse files Browse the repository at this point in the history
  • Loading branch information
macrat committed Nov 27, 2022
1 parent c5a7fb0 commit c1af03f
Show file tree
Hide file tree
Showing 3 changed files with 173 additions and 18 deletions.
5 changes: 0 additions & 5 deletions internal/store/incident.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,11 +14,6 @@ func newIncident(r api.Record) *api.Incident {
}
}

// incidentIsContinued checks if an incident is stil continued or not.
func incidentIsContinued(i *api.Incident, r api.Record) bool {
return i.EndsAt.IsZero() && i.Status == r.Status && i.Message == r.Message
}

type byIncidentCaused []*api.Incident

func (xs byIncidentCaused) Len() int {
Expand Down
94 changes: 81 additions & 13 deletions internal/store/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ import (
)

const (
PROBE_HISTORY_LEN = 40
PROBE_HISTORY_LEN = 60
INCIDENT_HISTORY_LEN = 20
)

Expand Down Expand Up @@ -238,19 +238,62 @@ func (s *Store) IncidentHistory() []*api.Incident {
return s.incidentHistoryWithoutLock()
}

func (s *Store) searchLastIncident(target string, t time.Time) *api.Incident {
cur, ok := s.currentIncidents[target]
if ok {
return cur
}

hs, hok := s.probeHistory[target]

if hok && len(hs.Records) > 0 && hs.Records[len(hs.Records)-1].Time.Before(t) {
return nil
}

for i := len(s.incidentHistory) - 1; i >= 0; i-- {
x := s.incidentHistory[i]

if x.Target.String() == target && t.Before(x.EndsAt) {
if x.StartsAt.Before(t) {
return x
}

if hok {
for i := len(hs.Records) - 1; i >= 0; i-- {
h := hs.Records[i]
if h.Time.Before(x.StartsAt) {
if h.Time.Before(t) {
return x
}
break
}
}
}
}
}

return nil
}

func (s *Store) setIncidentIfNeed(r api.Record, needCallback bool) {
if r.Status == api.StatusAborted {
return
}

target := r.Target.String()
if cur, ok := s.currentIncidents[target]; ok {
if incidentIsContinued(cur, r) {

if incident := s.searchLastIncident(target, r.Time); incident != nil {
if incident.StartsAt.After(r.Time) {
incident.StartsAt = r.Time
}

// nothing to do for continue of current incident, or for old resolved incident.
if incident.Status == r.Status && incident.Message == r.Message && (incident.EndsAt.IsZero() || incident.EndsAt.After(r.Time)) {
return
}

cur.EndsAt = r.Time
s.incidentHistory = append(s.incidentHistory, cur)
incident.EndsAt = r.Time
s.incidentHistory = append(s.incidentHistory, incident)
delete(s.currentIncidents, target)

if len(s.incidentHistory) > INCIDENT_HISTORY_LEN {
Expand All @@ -267,13 +310,38 @@ func (s *Store) setIncidentIfNeed(r api.Record, needCallback bool) {

if r.Status != api.StatusHealthy {
incident := newIncident(r)
s.currentIncidents[target] = incident

// kick incident callback when new incident caused
if needCallback {
s.incidentCount++
for _, cb := range s.OnStatusChanged {
cb(r)
if hs, ok := s.probeHistory[target]; ok && len(hs.Records) > 0 && hs.Records[len(hs.Records)-1].Time.After(r.Time) {
var next api.Record

for _, h := range hs.Records {
if r.Time.Before(h.Time) {
incident.EndsAt = h.Time
next = h
break
}
}
s.incidentHistory = append(s.incidentHistory, incident)

// kick incident callback when new incident caused
if needCallback {
s.incidentCount++
for _, cb := range s.OnStatusChanged {
cb(r)
if next.Status == api.StatusHealthy {
cb(next)
}
}
}
} else {
s.currentIncidents[target] = incident

// kick incident callback when new incident caused
if needCallback {
s.incidentCount++
for _, cb := range s.OnStatusChanged {
cb(r)
}
}
}
}
Expand All @@ -294,8 +362,8 @@ func (s *Store) Report(source *api.URL, r api.Record) {
s.historyLock.Lock()
defer s.historyLock.Unlock()

s.probeHistory.Append(source, r)
s.setIncidentIfNeed(r, true)
s.probeHistory.Append(source, r)
}
}

Expand Down Expand Up @@ -365,8 +433,8 @@ func (s *Store) Restore() error {
}

if r.Target.Scheme != "alert" && r.Target.Scheme != "ayd" {
s.probeHistory.Append(r.Target, r)
s.setIncidentIfNeed(r, false)
s.probeHistory.Append(r.Target, r)
}
}

Expand Down
92 changes: 92 additions & 0 deletions internal/store/store_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -605,6 +605,98 @@ func TestStore_incident(t *testing.T) {
assertCallbackCount(6)
}

func TestStore_delayedIncident(t *testing.T) {
s := testutil.NewStore(t)
defer s.Close()

var messages []string
callbackCount := 0
s.OnStatusChanged = []store.RecordHandler{
func(s *[]string, c *int) func(api.Record) {
return func(r api.Record) {
*s = append(*s, r.Message)
*c++
}
}(&messages, &callbackCount),
}

assert := func(count int, from, to int64, message ...string) {
t.Helper()

if count != callbackCount {
t.Fatalf("unexpected number of callbacks: expected %d but found %d", count, callbackCount)
}
for i := 0; i < len(messages); i++ {
if message[i] != messages[i] {
t.Fatalf("unexpected incident message[%d]: expected %q but found %q", i, message[i], messages[i])
}
}

is := s.CurrentIncidents()
if len(is) != 0 {
t.Fatalf("unexpected current incidents found: %v", is)
}

is = s.IncidentHistory()
if len(is) == 0 {
t.Fatalf("incident not found")
} else {
i := is[len(is)-1]

if i.Target.String() != "dummy:" {
t.Fatalf("unexpected incident found: %s", i)
}

if i.StartsAt.Unix() != from {
t.Fatalf("incident should begins at %d but begins at %d", from, i.StartsAt.Unix())
}
if i.EndsAt.Unix() != to {
t.Errorf("incident should ends at %d but ends at %d", to, i.EndsAt.Unix())
}
if t.Failed() {
t.FailNow()
}
}
}
report := func(offset int64, message string, status api.Status) {
t.Helper()

s.Report(&api.URL{Scheme: "dummy"}, api.Record{
Time: time.Unix(0, 0).Add(time.Duration(offset) * time.Second),
Target: &api.URL{Scheme: "dummy"},
Message: message,
Status: status,
})
}

// offset 05 10 15 20 25 30 35 40 50
// status F H F F F F F H H
// order | 1 | 2 | 3 | 4 5
// | | 6 | -- First test. Put into middle.
// | | 7 -- Second test. Put into very end.
// | 8 -- Third test. Put into before begin.
// 9 -- Fourth test. New incident.

report(10, "hello1", api.StatusHealthy)
report(20, "oh no", api.StatusFailure)
report(30, "oh no", api.StatusFailure)
report(40, "hello2", api.StatusHealthy)
report(50, "hello3", api.StatusHealthy)
assert(2, 20, 40, "oh no", "hello2")

report(25, "oh no", api.StatusFailure) // First test
assert(2, 20, 40, "oh no", "hello2")

report(35, "oh no", api.StatusFailure) // Second test
assert(2, 20, 40, "oh no", "hello2")

report(15, "oh no", api.StatusFailure) // Third test
assert(2, 15, 40, "oh no", "hello2")

report(5, "wah", api.StatusFailure) // Fourth test
assert(4, 5, 10, "oh no", "hello2", "wah", "hello1")
}

func TestStore_incident_len_limit(t *testing.T) {
s := testutil.NewStore(t)
defer s.Close()
Expand Down

0 comments on commit c1af03f

Please sign in to comment.