diff --git a/dispatch/dispatch.go b/dispatch/dispatch.go index 4ae5c28568..b737a0d252 100644 --- a/dispatch/dispatch.go +++ b/dispatch/dispatch.go @@ -450,7 +450,7 @@ func (ag *aggrGroup) run(nf notifyFunc) { // Wait the configured interval before calling flush again. ag.mtx.Lock() - ag.next.Reset(ag.opts.GroupWait) + ag.next.Reset(ag.opts.GroupInterval) ag.hasFlushed = true ag.mtx.Unlock() diff --git a/notify/notify.go b/notify/notify.go index e9ca40fbf2..a3f989bbfd 100644 --- a/notify/notify.go +++ b/notify/notify.go @@ -573,14 +573,15 @@ func hashAlert(a *types.Alert) uint64 { return hash } -func (n *DedupStage) needsUpdate(entry *nflogpb.Entry, firing, resolved map[uint64]struct{}, repeat, groupInterval time.Duration) bool { +func (n *DedupStage) needsUpdate(entry *nflogpb.Entry, firing, resolved map[uint64]struct{}, repeat, groupInterval time.Duration, now time.Time) bool { // If we haven't notified about the alert group before, notify right away // unless we only have resolved alerts. if entry == nil { return len(firing) > 0 } - groupIntervalMuted := len(entry.FiringAlerts) > 0 && entry.Timestamp.After(time.Now().Add(-groupInterval)) + groupIntervalExpired := float64(now.UnixMilli()-entry.Timestamp.UnixMilli())/float64(groupInterval.Milliseconds()) >= 0.99 + groupIntervalMuted := len(entry.FiringAlerts) > 0 && !groupIntervalExpired if !entry.IsFiringSubset(firing) && !groupIntervalMuted { return true @@ -589,7 +590,7 @@ func (n *DedupStage) needsUpdate(entry *nflogpb.Entry, firing, resolved map[uint // Notify about all alerts being resolved. // This is done irrespective of the send_resolved flag to make sure that // the firing alerts are cleared from the notification log. - if len(firing) == 0 { + if len(firing) == 0 && !groupIntervalMuted { // If the current alert group and last notification contain no firing // alert, it means that some alerts have been fired and resolved during the // last interval. In this case, there is no need to notify the receiver @@ -597,7 +598,7 @@ func (n *DedupStage) needsUpdate(entry *nflogpb.Entry, firing, resolved map[uint return len(entry.FiringAlerts) > 0 } - if n.rs.SendResolved() && !groupIntervalMuted { + if n.rs.SendResolved() && !entry.IsResolvedSubset(resolved) && !groupIntervalMuted { return true } @@ -622,6 +623,11 @@ func (n *DedupStage) Exec(ctx context.Context, _ log.Logger, alerts ...*types.Al return ctx, nil, errors.New("repeat interval missing") } + now, ok := Now(ctx) + if !ok { + return ctx, nil, errors.New("now timestamp missing") + } + firingSet := map[uint64]struct{}{} resolvedSet := map[uint64]struct{}{} firing := []uint64{} @@ -656,9 +662,10 @@ func (n *DedupStage) Exec(ctx context.Context, _ log.Logger, alerts ...*types.Al return ctx, nil, errors.Errorf("unexpected entry result size %d", len(entries)) } - if n.needsUpdate(entry, firingSet, resolvedSet, repeatInterval, groupInterval) { + if n.needsUpdate(entry, firingSet, resolvedSet, repeatInterval, groupInterval, now) { return ctx, alerts, nil } + return ctx, nil, nil } diff --git a/notify/notify_test.go b/notify/notify_test.go index 50aef9a35a..a5da56df31 100644 --- a/notify/notify_test.go +++ b/notify/notify_test.go @@ -207,7 +207,7 @@ func TestDedupStageNeedsUpdate(t *testing.T) { now: func() time.Time { return now }, rs: sendResolved(c.resolve), } - res := s.needsUpdate(c.entry, c.firingAlerts, c.resolvedAlerts, c.repeat, 2*time.Hour) + res := s.needsUpdate(c.entry, c.firingAlerts, c.resolvedAlerts, c.repeat, 2*time.Hour, now) require.Equal(t, c.res, res) } } @@ -234,6 +234,12 @@ func TestDedupStage(t *testing.T) { ctx = WithGroupKey(ctx, "1") + _, _, err = s.Exec(ctx, log.NewNopLogger()) + require.EqualError(t, err, "group interval missing") + + ctx = WithGroupKey(ctx, "1") + ctx = WithGroupInterval(ctx, time.Hour) + _, _, err = s.Exec(ctx, log.NewNopLogger()) require.EqualError(t, err, "repeat interval missing") @@ -288,7 +294,7 @@ func TestDedupStage(t *testing.T) { qres: []*nflogpb.Entry{ { FiringAlerts: []uint64{1, 2, 3, 4}, - Timestamp: now, + Timestamp: now.Add(-time.Hour), }, }, } diff --git a/test/with_api_v2/acceptance/send_test.go b/test/with_api_v2/acceptance/send_test.go index 4e13d37ca5..b507229b1b 100644 --- a/test/with_api_v2/acceptance/send_test.go +++ b/test/with_api_v2/acceptance/send_test.go @@ -207,9 +207,9 @@ receivers: am.Push(At(1), Alert("alertname", "test1")) co1.Want(Between(2, 2.5), Alert("alertname", "test1").Active(1)) - co1.Want(Between(6, 6.5), Alert("alertname", "test1").Active(1)) + co1.Want(Between(5, 6.5), Alert("alertname", "test1").Active(1)) - co2.Want(Between(6, 6.5), Alert("alertname", "test1").Active(1)) + co2.Want(Between(5, 6.5), Alert("alertname", "test1").Active(1)) at.Run()