Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix config reloads not respecting group interval #3074

Open
wants to merge 3 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions dispatch/dispatch.go
Original file line number Diff line number Diff line change
Expand Up @@ -446,6 +446,7 @@ func (ag *aggrGroup) run(nf notifyFunc) {
ctx = notify.WithRepeatInterval(ctx, ag.opts.RepeatInterval)
ctx = notify.WithMuteTimeIntervals(ctx, ag.opts.MuteTimeIntervals)
ctx = notify.WithActiveTimeIntervals(ctx, ag.opts.ActiveTimeIntervals)
ctx = notify.WithGroupInterval(ctx, ag.opts.GroupInterval)

// Wait the configured interval before calling flush again.
ag.mtx.Lock()
Expand Down
37 changes: 32 additions & 5 deletions notify/notify.go
Original file line number Diff line number Diff line change
Expand Up @@ -116,6 +116,7 @@ const (
keyNow
keyMuteTimeIntervals
keyActiveTimeIntervals
keyGroupInterval
)

// WithReceiverName populates a context with a receiver name.
Expand Down Expand Up @@ -162,6 +163,11 @@ func WithActiveTimeIntervals(ctx context.Context, at []string) context.Context {
return context.WithValue(ctx, keyActiveTimeIntervals, at)
}

// WithGroupInterval populates a context with a group interval.
func WithGroupInterval(ctx context.Context, t time.Duration) context.Context {
return context.WithValue(ctx, keyGroupInterval, t)
}

// RepeatInterval extracts a repeat interval from the context. Iff none exists, the
// second argument is false.
func RepeatInterval(ctx context.Context) (time.Duration, bool) {
Expand Down Expand Up @@ -190,6 +196,13 @@ func GroupLabels(ctx context.Context) (model.LabelSet, bool) {
return v, ok
}

// GroupInterval extracts group interval from the context. Iff none exists, the
// second argument is false.
func GroupInterval(ctx context.Context) (time.Duration, bool) {
v, ok := ctx.Value(keyGroupInterval).(time.Duration)
return v, ok
}

// Now extracts a now timestamp from the context. Iff none exists, the
// second argument is false.
func Now(ctx context.Context) (time.Time, bool) {
Expand Down Expand Up @@ -560,29 +573,32 @@ func hashAlert(a *types.Alert) uint64 {
return hash
}

func (n *DedupStage) needsUpdate(entry *nflogpb.Entry, firing, resolved map[uint64]struct{}, repeat time.Duration) bool {
func (n *DedupStage) needsUpdate(entry *nflogpb.Entry, firing, resolved map[uint64]struct{}, repeat, groupInterval time.Duration, now time.Time) bool {
// If we haven't notified about the alert group before, notify right away
// unless we only have resolved alerts.
if entry == nil {
return len(firing) > 0
}

if !entry.IsFiringSubset(firing) {
groupIntervalExpired := float64(now.UnixMilli()-entry.Timestamp.UnixMilli())/float64(groupInterval.Milliseconds()) >= 0.99
groupIntervalMuted := len(entry.FiringAlerts) > 0 && !groupIntervalExpired

if !entry.IsFiringSubset(firing) && !groupIntervalMuted {
return true
}

// Notify about all alerts being resolved.
// This is done irrespective of the send_resolved flag to make sure that
// the firing alerts are cleared from the notification log.
if len(firing) == 0 {
if len(firing) == 0 && !groupIntervalMuted {
// If the current alert group and last notification contain no firing
// alert, it means that some alerts have been fired and resolved during the
// last interval. In this case, there is no need to notify the receiver
// since it doesn't know about them.
return len(entry.FiringAlerts) > 0
}

if n.rs.SendResolved() && !entry.IsResolvedSubset(resolved) {
if n.rs.SendResolved() && !entry.IsResolvedSubset(resolved) && !groupIntervalMuted {
return true
}

Expand All @@ -597,11 +613,21 @@ func (n *DedupStage) Exec(ctx context.Context, _ log.Logger, alerts ...*types.Al
return ctx, nil, errors.New("group key missing")
}

groupInterval, ok := GroupInterval(ctx)
if !ok {
return ctx, nil, errors.New("group interval missing")
}

repeatInterval, ok := RepeatInterval(ctx)
if !ok {
return ctx, nil, errors.New("repeat interval missing")
}

now, ok := Now(ctx)
if !ok {
return ctx, nil, errors.New("now timestamp missing")
}

firingSet := map[uint64]struct{}{}
resolvedSet := map[uint64]struct{}{}
firing := []uint64{}
Expand Down Expand Up @@ -636,9 +662,10 @@ func (n *DedupStage) Exec(ctx context.Context, _ log.Logger, alerts ...*types.Al
return ctx, nil, errors.Errorf("unexpected entry result size %d", len(entries))
}

if n.needsUpdate(entry, firingSet, resolvedSet, repeatInterval) {
if n.needsUpdate(entry, firingSet, resolvedSet, repeatInterval, groupInterval, now) {
return ctx, alerts, nil
}

return ctx, nil, nil
}

Expand Down
15 changes: 13 additions & 2 deletions notify/notify_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -207,7 +207,7 @@ func TestDedupStageNeedsUpdate(t *testing.T) {
now: func() time.Time { return now },
rs: sendResolved(c.resolve),
}
res := s.needsUpdate(c.entry, c.firingAlerts, c.resolvedAlerts, c.repeat)
res := s.needsUpdate(c.entry, c.firingAlerts, c.resolvedAlerts, c.repeat, 2*time.Hour, now)
require.Equal(t, c.res, res)
}
}
Expand All @@ -234,11 +234,22 @@ func TestDedupStage(t *testing.T) {

ctx = WithGroupKey(ctx, "1")

_, _, err = s.Exec(ctx, log.NewNopLogger())
require.EqualError(t, err, "group interval missing")

ctx = WithGroupKey(ctx, "1")
ctx = WithGroupInterval(ctx, time.Hour)

_, _, err = s.Exec(ctx, log.NewNopLogger())
require.EqualError(t, err, "repeat interval missing")

ctx = WithRepeatInterval(ctx, time.Hour)

_, _, err = s.Exec(ctx, log.NewNopLogger())
require.EqualError(t, err, "now timestamp missing")

ctx = WithNow(ctx, now)

alerts := []*types.Alert{{}, {}, {}}

// Must catch notification log query errors.
Expand Down Expand Up @@ -288,7 +299,7 @@ func TestDedupStage(t *testing.T) {
qres: []*nflogpb.Entry{
{
FiringAlerts: []uint64{1, 2, 3, 4},
Timestamp: now,
Timestamp: now.Add(-time.Hour),
},
},
}
Expand Down
4 changes: 2 additions & 2 deletions test/with_api_v2/acceptance/send_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -207,9 +207,9 @@ receivers:
am.Push(At(1), Alert("alertname", "test1"))

co1.Want(Between(2, 2.5), Alert("alertname", "test1").Active(1))
co1.Want(Between(6, 6.5), Alert("alertname", "test1").Active(1))
co1.Want(Between(5, 6.5), Alert("alertname", "test1").Active(1))

co2.Want(Between(6, 6.5), Alert("alertname", "test1").Active(1))
co2.Want(Between(5, 6.5), Alert("alertname", "test1").Active(1))

at.Run()

Expand Down