diff --git a/dispatch/dispatch.go b/dispatch/dispatch.go index 853d849688..b737a0d252 100644 --- a/dispatch/dispatch.go +++ b/dispatch/dispatch.go @@ -446,6 +446,7 @@ func (ag *aggrGroup) run(nf notifyFunc) { ctx = notify.WithRepeatInterval(ctx, ag.opts.RepeatInterval) ctx = notify.WithMuteTimeIntervals(ctx, ag.opts.MuteTimeIntervals) ctx = notify.WithActiveTimeIntervals(ctx, ag.opts.ActiveTimeIntervals) + ctx = notify.WithGroupInterval(ctx, ag.opts.GroupInterval) // Wait the configured interval before calling flush again. ag.mtx.Lock() diff --git a/notify/notify.go b/notify/notify.go index d6c741fe47..a3f989bbfd 100644 --- a/notify/notify.go +++ b/notify/notify.go @@ -116,6 +116,7 @@ const ( keyNow keyMuteTimeIntervals keyActiveTimeIntervals + keyGroupInterval ) // WithReceiverName populates a context with a receiver name. @@ -162,6 +163,11 @@ func WithActiveTimeIntervals(ctx context.Context, at []string) context.Context { return context.WithValue(ctx, keyActiveTimeIntervals, at) } +// WithGroupInterval populates a context with a group interval. +func WithGroupInterval(ctx context.Context, t time.Duration) context.Context { + return context.WithValue(ctx, keyGroupInterval, t) +} + // RepeatInterval extracts a repeat interval from the context. Iff none exists, the // second argument is false. func RepeatInterval(ctx context.Context) (time.Duration, bool) { @@ -190,6 +196,13 @@ func GroupLabels(ctx context.Context) (model.LabelSet, bool) { return v, ok } +// GroupInterval extracts group interval from the context. Iff none exists, the +// second argument is false. +func GroupInterval(ctx context.Context) (time.Duration, bool) { + v, ok := ctx.Value(keyGroupInterval).(time.Duration) + return v, ok +} + // Now extracts a now timestamp from the context. Iff none exists, the // second argument is false. func Now(ctx context.Context) (time.Time, bool) { @@ -560,21 +573,24 @@ func hashAlert(a *types.Alert) uint64 { return hash } -func (n *DedupStage) needsUpdate(entry *nflogpb.Entry, firing, resolved map[uint64]struct{}, repeat time.Duration) bool { +func (n *DedupStage) needsUpdate(entry *nflogpb.Entry, firing, resolved map[uint64]struct{}, repeat, groupInterval time.Duration, now time.Time) bool { // If we haven't notified about the alert group before, notify right away // unless we only have resolved alerts. if entry == nil { return len(firing) > 0 } - if !entry.IsFiringSubset(firing) { + groupIntervalExpired := float64(now.UnixMilli()-entry.Timestamp.UnixMilli())/float64(groupInterval.Milliseconds()) >= 0.99 + groupIntervalMuted := len(entry.FiringAlerts) > 0 && !groupIntervalExpired + + if !entry.IsFiringSubset(firing) && !groupIntervalMuted { return true } // Notify about all alerts being resolved. // This is done irrespective of the send_resolved flag to make sure that // the firing alerts are cleared from the notification log. - if len(firing) == 0 { + if len(firing) == 0 && !groupIntervalMuted { // If the current alert group and last notification contain no firing // alert, it means that some alerts have been fired and resolved during the // last interval. In this case, there is no need to notify the receiver @@ -582,7 +598,7 @@ func (n *DedupStage) needsUpdate(entry *nflogpb.Entry, firing, resolved map[uint return len(entry.FiringAlerts) > 0 } - if n.rs.SendResolved() && !entry.IsResolvedSubset(resolved) { + if n.rs.SendResolved() && !entry.IsResolvedSubset(resolved) && !groupIntervalMuted { return true } @@ -597,11 +613,21 @@ func (n *DedupStage) Exec(ctx context.Context, _ log.Logger, alerts ...*types.Al return ctx, nil, errors.New("group key missing") } + groupInterval, ok := GroupInterval(ctx) + if !ok { + return ctx, nil, errors.New("group interval missing") + } + repeatInterval, ok := RepeatInterval(ctx) if !ok { return ctx, nil, errors.New("repeat interval missing") } + now, ok := Now(ctx) + if !ok { + return ctx, nil, errors.New("now timestamp missing") + } + firingSet := map[uint64]struct{}{} resolvedSet := map[uint64]struct{}{} firing := []uint64{} @@ -636,9 +662,10 @@ func (n *DedupStage) Exec(ctx context.Context, _ log.Logger, alerts ...*types.Al return ctx, nil, errors.Errorf("unexpected entry result size %d", len(entries)) } - if n.needsUpdate(entry, firingSet, resolvedSet, repeatInterval) { + if n.needsUpdate(entry, firingSet, resolvedSet, repeatInterval, groupInterval, now) { return ctx, alerts, nil } + return ctx, nil, nil } diff --git a/notify/notify_test.go b/notify/notify_test.go index 5beaf18242..a945d91261 100644 --- a/notify/notify_test.go +++ b/notify/notify_test.go @@ -207,7 +207,7 @@ func TestDedupStageNeedsUpdate(t *testing.T) { now: func() time.Time { return now }, rs: sendResolved(c.resolve), } - res := s.needsUpdate(c.entry, c.firingAlerts, c.resolvedAlerts, c.repeat) + res := s.needsUpdate(c.entry, c.firingAlerts, c.resolvedAlerts, c.repeat, 2*time.Hour, now) require.Equal(t, c.res, res) } } @@ -234,11 +234,22 @@ func TestDedupStage(t *testing.T) { ctx = WithGroupKey(ctx, "1") + _, _, err = s.Exec(ctx, log.NewNopLogger()) + require.EqualError(t, err, "group interval missing") + + ctx = WithGroupKey(ctx, "1") + ctx = WithGroupInterval(ctx, time.Hour) + _, _, err = s.Exec(ctx, log.NewNopLogger()) require.EqualError(t, err, "repeat interval missing") ctx = WithRepeatInterval(ctx, time.Hour) + _, _, err = s.Exec(ctx, log.NewNopLogger()) + require.EqualError(t, err, "now timestamp missing") + + ctx = WithNow(ctx, now) + alerts := []*types.Alert{{}, {}, {}} // Must catch notification log query errors. @@ -288,7 +299,7 @@ func TestDedupStage(t *testing.T) { qres: []*nflogpb.Entry{ { FiringAlerts: []uint64{1, 2, 3, 4}, - Timestamp: now, + Timestamp: now.Add(-time.Hour), }, }, } diff --git a/test/with_api_v2/acceptance/send_test.go b/test/with_api_v2/acceptance/send_test.go index 4e13d37ca5..b507229b1b 100644 --- a/test/with_api_v2/acceptance/send_test.go +++ b/test/with_api_v2/acceptance/send_test.go @@ -207,9 +207,9 @@ receivers: am.Push(At(1), Alert("alertname", "test1")) co1.Want(Between(2, 2.5), Alert("alertname", "test1").Active(1)) - co1.Want(Between(6, 6.5), Alert("alertname", "test1").Active(1)) + co1.Want(Between(5, 6.5), Alert("alertname", "test1").Active(1)) - co2.Want(Between(6, 6.5), Alert("alertname", "test1").Active(1)) + co2.Want(Between(5, 6.5), Alert("alertname", "test1").Active(1)) at.Run()