diff --git a/dispatch/dispatch.go b/dispatch/dispatch.go index 853d849688..4ae5c28568 100644 --- a/dispatch/dispatch.go +++ b/dispatch/dispatch.go @@ -446,10 +446,11 @@ func (ag *aggrGroup) run(nf notifyFunc) { ctx = notify.WithRepeatInterval(ctx, ag.opts.RepeatInterval) ctx = notify.WithMuteTimeIntervals(ctx, ag.opts.MuteTimeIntervals) ctx = notify.WithActiveTimeIntervals(ctx, ag.opts.ActiveTimeIntervals) + ctx = notify.WithGroupInterval(ctx, ag.opts.GroupInterval) // Wait the configured interval before calling flush again. ag.mtx.Lock() - ag.next.Reset(ag.opts.GroupInterval) + ag.next.Reset(ag.opts.GroupWait) ag.hasFlushed = true ag.mtx.Unlock() diff --git a/notify/notify.go b/notify/notify.go index d6c741fe47..e9ca40fbf2 100644 --- a/notify/notify.go +++ b/notify/notify.go @@ -116,6 +116,7 @@ const ( keyNow keyMuteTimeIntervals keyActiveTimeIntervals + keyGroupInterval ) // WithReceiverName populates a context with a receiver name. @@ -162,6 +163,11 @@ func WithActiveTimeIntervals(ctx context.Context, at []string) context.Context { return context.WithValue(ctx, keyActiveTimeIntervals, at) } +// WithGroupInterval populates a context with a group interval. +func WithGroupInterval(ctx context.Context, t time.Duration) context.Context { + return context.WithValue(ctx, keyGroupInterval, t) +} + // RepeatInterval extracts a repeat interval from the context. Iff none exists, the // second argument is false. func RepeatInterval(ctx context.Context) (time.Duration, bool) { @@ -190,6 +196,13 @@ func GroupLabels(ctx context.Context) (model.LabelSet, bool) { return v, ok } +// GroupInterval extracts group interval from the context. Iff none exists, the +// second argument is false. +func GroupInterval(ctx context.Context) (time.Duration, bool) { + v, ok := ctx.Value(keyGroupInterval).(time.Duration) + return v, ok +} + // Now extracts a now timestamp from the context. Iff none exists, the // second argument is false. func Now(ctx context.Context) (time.Time, bool) { @@ -560,14 +573,16 @@ func hashAlert(a *types.Alert) uint64 { return hash } -func (n *DedupStage) needsUpdate(entry *nflogpb.Entry, firing, resolved map[uint64]struct{}, repeat time.Duration) bool { +func (n *DedupStage) needsUpdate(entry *nflogpb.Entry, firing, resolved map[uint64]struct{}, repeat, groupInterval time.Duration) bool { // If we haven't notified about the alert group before, notify right away // unless we only have resolved alerts. if entry == nil { return len(firing) > 0 } - if !entry.IsFiringSubset(firing) { + groupIntervalMuted := len(entry.FiringAlerts) > 0 && entry.Timestamp.After(time.Now().Add(-groupInterval)) + + if !entry.IsFiringSubset(firing) && !groupIntervalMuted { return true } @@ -582,7 +597,7 @@ func (n *DedupStage) needsUpdate(entry *nflogpb.Entry, firing, resolved map[uint return len(entry.FiringAlerts) > 0 } - if n.rs.SendResolved() && !entry.IsResolvedSubset(resolved) { + if n.rs.SendResolved() && !groupIntervalMuted { return true } @@ -597,6 +612,11 @@ func (n *DedupStage) Exec(ctx context.Context, _ log.Logger, alerts ...*types.Al return ctx, nil, errors.New("group key missing") } + groupInterval, ok := GroupInterval(ctx) + if !ok { + return ctx, nil, errors.New("group interval missing") + } + repeatInterval, ok := RepeatInterval(ctx) if !ok { return ctx, nil, errors.New("repeat interval missing") @@ -636,7 +656,7 @@ func (n *DedupStage) Exec(ctx context.Context, _ log.Logger, alerts ...*types.Al return ctx, nil, errors.Errorf("unexpected entry result size %d", len(entries)) } - if n.needsUpdate(entry, firingSet, resolvedSet, repeatInterval) { + if n.needsUpdate(entry, firingSet, resolvedSet, repeatInterval, groupInterval) { return ctx, alerts, nil } return ctx, nil, nil diff --git a/notify/notify_test.go b/notify/notify_test.go index 5beaf18242..50aef9a35a 100644 --- a/notify/notify_test.go +++ b/notify/notify_test.go @@ -207,7 +207,7 @@ func TestDedupStageNeedsUpdate(t *testing.T) { now: func() time.Time { return now }, rs: sendResolved(c.resolve), } - res := s.needsUpdate(c.entry, c.firingAlerts, c.resolvedAlerts, c.repeat) + res := s.needsUpdate(c.entry, c.firingAlerts, c.resolvedAlerts, c.repeat, 2*time.Hour) require.Equal(t, c.res, res) } }