diff --git a/dispatch/dispatch.go b/dispatch/dispatch.go index 2468ee8f0e..a7a2413650 100644 --- a/dispatch/dispatch.go +++ b/dispatch/dispatch.go @@ -447,6 +447,7 @@ func (ag *aggrGroup) run(nf notifyFunc) { ctx = notify.WithNow(ctx, now) // Populate context with information needed along the pipeline. + ctx = notify.WithGroupInterval(ctx, ag.opts.GroupInterval) ctx = notify.WithGroupKey(ctx, ag.GroupKey()) ctx = notify.WithGroupLabels(ctx, ag.labels) ctx = notify.WithReceiverName(ctx, ag.opts.Receiver) diff --git a/nflog/nflog.go b/nflog/nflog.go index 3084bf00a0..3eef52d0e1 100644 --- a/nflog/nflog.go +++ b/nflog/nflog.go @@ -377,7 +377,7 @@ func stateKey(k string, r *pb.Receiver) string { return fmt.Sprintf("%s:%s", k, receiverKey(r)) } -func (l *Log) Log(r *pb.Receiver, gkey string, firingAlerts, resolvedAlerts []uint64, expiry time.Duration) error { +func (l *Log) Log(r *pb.Receiver, gkey string, firingAlerts, resolvedAlerts []uint64, expiry time.Duration, dispatchTime time.Time) error { // Write all st with the same timestamp. now := l.now() key := stateKey(gkey, r) @@ -405,6 +405,7 @@ func (l *Log) Log(r *pb.Receiver, gkey string, firingAlerts, resolvedAlerts []ui Timestamp: now, FiringAlerts: firingAlerts, ResolvedAlerts: resolvedAlerts, + DispatchTime: dispatchTime, }, ExpiresAt: expiresAt, } diff --git a/nflog/nflog_test.go b/nflog/nflog_test.go index a702ddb5e9..0e86eba65c 100644 --- a/nflog/nflog_test.go +++ b/nflog/nflog_test.go @@ -355,7 +355,7 @@ func TestQuery(t *testing.T) { firingAlerts := []uint64{1, 2, 3} resolvedAlerts := []uint64{4, 5} - err = nl.Log(recv, "key", firingAlerts, resolvedAlerts, 0) + err = nl.Log(recv, "key", firingAlerts, resolvedAlerts, 0, time.Time{}) require.NoError(t, err, "logging notification failed") entries, err := nl.Query(QGroupKey("key"), QReceiver(recv)) diff --git a/nflog/nflogpb/nflog.pb.go b/nflog/nflogpb/nflog.pb.go index a5960171a0..e603269818 100644 --- a/nflog/nflogpb/nflog.pb.go +++ b/nflog/nflogpb/nflog.pb.go @@ -92,10 +92,12 @@ type Entry struct { // FiringAlerts list of hashes of firing alerts at the last notification time. FiringAlerts []uint64 `protobuf:"varint,6,rep,packed,name=firing_alerts,json=firingAlerts,proto3" json:"firing_alerts,omitempty"` // ResolvedAlerts list of hashes of resolved alerts at the last notification time. - ResolvedAlerts []uint64 `protobuf:"varint,7,rep,packed,name=resolved_alerts,json=resolvedAlerts,proto3" json:"resolved_alerts,omitempty"` - XXX_NoUnkeyedLiteral struct{} `json:"-"` - XXX_unrecognized []byte `json:"-"` - XXX_sizecache int32 `json:"-"` + ResolvedAlerts []uint64 `protobuf:"varint,7,rep,packed,name=resolved_alerts,json=resolvedAlerts,proto3" json:"resolved_alerts,omitempty"` + // Timestamp of the last time the notifications started. + DispatchTime time.Time `protobuf:"bytes,8,opt,name=dispatch_time,json=dispatchTime,proto3,stdtime" json:"dispatch_time"` + XXX_NoUnkeyedLiteral struct{} `json:"-"` + XXX_unrecognized []byte `json:"-"` + XXX_sizecache int32 `json:"-"` } func (m *Entry) Reset() { *m = Entry{} } @@ -186,32 +188,33 @@ func init() { func init() { proto.RegisterFile("nflog.proto", fileDescriptor_c2d9785ad9c3e602) } var fileDescriptor_c2d9785ad9c3e602 = []byte{ - // 385 bytes of a gzipped FileDescriptorProto - 0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0x8c, 0x90, 0xcf, 0x6e, 0xd3, 0x40, - 0x10, 0xc6, 0xbb, 0x4d, 0xd3, 0xda, 0xe3, 0xb4, 0x94, 0x15, 0x07, 0xcb, 0x08, 0xc7, 0x0a, 0x48, - 0xf8, 0x82, 0x23, 0x95, 0x27, 0x68, 0x10, 0x12, 0x12, 0x82, 0xc3, 0x8a, 0x2b, 0xb2, 0x36, 0x74, - 0xb2, 0x5e, 0x61, 0x7b, 0xad, 0xf5, 0x36, 0x6a, 0xde, 0x82, 0x47, 0xe0, 0x71, 0x72, 0xe4, 0x09, - 0xf8, 0x93, 0x27, 0x41, 0xde, 0xb5, 0x1d, 0x8e, 0xdc, 0x66, 0x7f, 0xf3, 0xcd, 0xcc, 0xb7, 0x1f, - 0x04, 0xf5, 0xa6, 0x54, 0x22, 0x6b, 0xb4, 0x32, 0x8a, 0x5e, 0xd8, 0x47, 0xb3, 0x8e, 0xe6, 0x42, - 0x29, 0x51, 0xe2, 0xd2, 0xe2, 0xf5, 0xfd, 0x66, 0x69, 0x64, 0x85, 0xad, 0xe1, 0x55, 0xe3, 0x94, - 0xd1, 0x13, 0xa1, 0x84, 0xb2, 0xe5, 0xb2, 0xab, 0x1c, 0x5d, 0x7c, 0x06, 0x8f, 0xe1, 0x17, 0x94, - 0x5b, 0xd4, 0xf4, 0x19, 0x80, 0xd0, 0xea, 0xbe, 0xc9, 0x6b, 0x5e, 0x61, 0x48, 0x12, 0x92, 0xfa, - 0xcc, 0xb7, 0xe4, 0x23, 0xaf, 0x90, 0x26, 0x10, 0xc8, 0xda, 0xa0, 0xd0, 0xdc, 0x48, 0x55, 0x87, - 0xa7, 0xb6, 0xff, 0x2f, 0xa2, 0xd7, 0x30, 0x91, 0x77, 0x0f, 0xe1, 0x24, 0x21, 0xe9, 0x25, 0xeb, - 0xca, 0xc5, 0xf7, 0x53, 0x98, 0xbe, 0xad, 0x8d, 0xde, 0xd1, 0xa7, 0xe0, 0x56, 0xe5, 0x5f, 0x71, - 0x67, 0x77, 0xcf, 0x98, 0x67, 0xc1, 0x7b, 0xdc, 0xd1, 0x57, 0xe0, 0xe9, 0xde, 0x85, 0xdd, 0x1b, - 0xdc, 0x3c, 0xce, 0xfa, 0x8f, 0x65, 0x83, 0x3d, 0x36, 0x4a, 0x8e, 0x46, 0x0b, 0xde, 0x16, 0xf6, - 0xdc, 0xac, 0x37, 0xfa, 0x8e, 0xb7, 0x05, 0x8d, 0xba, 0x6d, 0xad, 0x2a, 0xb7, 0x78, 0x17, 0x9e, - 0x25, 0x24, 0xf5, 0xd8, 0xf8, 0xa6, 0x2b, 0xf0, 0xc7, 0x60, 0xc2, 0xa9, 0x3d, 0x15, 0x65, 0x2e, - 0xba, 0x6c, 0x88, 0x2e, 0xfb, 0x34, 0x28, 0x56, 0xde, 0xfe, 0xe7, 0xfc, 0xe4, 0xdb, 0xaf, 0x39, - 0x61, 0xc7, 0x31, 0xfa, 0x1c, 0x2e, 0x37, 0x52, 0xcb, 0x5a, 0xe4, 0xbc, 0x44, 0x6d, 0xda, 0xf0, - 0x3c, 0x99, 0xa4, 0x67, 0x6c, 0xe6, 0xe0, 0xad, 0x65, 0xf4, 0x25, 0x3c, 0x1a, 0x8e, 0x0e, 0xb2, - 0x0b, 0x2b, 0xbb, 0x1a, 0xb0, 0x13, 0x2e, 0xb6, 0xe0, 0x7f, 0xc0, 0xb6, 0x70, 0x29, 0xbd, 0x80, - 0x29, 0x76, 0x85, 0x4d, 0x28, 0xb8, 0xb9, 0x1a, 0x53, 0xb0, 0x6d, 0xe6, 0x9a, 0xf4, 0x0d, 0x00, - 0x3e, 0x34, 0x52, 0x63, 0x9b, 0x73, 0xd3, 0x07, 0xf6, 0x9f, 0xbf, 0xe8, 0xe7, 0x6e, 0xcd, 0xea, - 0x7a, 0xff, 0x27, 0x3e, 0xd9, 0x1f, 0x62, 0xf2, 0xe3, 0x10, 0x93, 0xdf, 0x87, 0x98, 0xac, 0xcf, - 0xed, 0xe8, 0xeb, 0xbf, 0x01, 0x00, 0x00, 0xff, 0xff, 0x49, 0xcd, 0xa7, 0x1e, 0x61, 0x02, 0x00, - 0x00, + // 404 bytes of a gzipped FileDescriptorProto + 0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0x94, 0x90, 0xcf, 0x6e, 0xd3, 0x40, + 0x10, 0xc6, 0xbb, 0x4d, 0xd3, 0x3a, 0x93, 0xa4, 0x94, 0x15, 0x07, 0xcb, 0x08, 0xc7, 0x0a, 0x48, + 0xf8, 0x82, 0x23, 0x95, 0x27, 0x68, 0x10, 0x12, 0x08, 0xc1, 0x61, 0xc5, 0x15, 0x59, 0x9b, 0x66, + 0xb2, 0x5e, 0x61, 0x7b, 0xad, 0xdd, 0x6d, 0xd4, 0xbc, 0x05, 0x8f, 0x95, 0x23, 0x4f, 0xc0, 0x9f, + 0x5c, 0x79, 0x09, 0xe4, 0xf5, 0x9f, 0x70, 0xa4, 0xb7, 0xd9, 0x6f, 0xbe, 0x99, 0xf9, 0xf6, 0x07, + 0xe3, 0x72, 0x93, 0x2b, 0x91, 0x54, 0x5a, 0x59, 0x45, 0x2f, 0xdc, 0xa3, 0x5a, 0x05, 0x33, 0xa1, + 0x94, 0xc8, 0x71, 0xe1, 0xe4, 0xd5, 0xdd, 0x66, 0x61, 0x65, 0x81, 0xc6, 0xf2, 0xa2, 0x6a, 0x9c, + 0xc1, 0x13, 0xa1, 0x84, 0x72, 0xe5, 0xa2, 0xae, 0x1a, 0x75, 0xfe, 0x05, 0x3c, 0x86, 0xb7, 0x28, + 0xb7, 0xa8, 0xe9, 0x33, 0x00, 0xa1, 0xd5, 0x5d, 0x95, 0x96, 0xbc, 0x40, 0x9f, 0x44, 0x24, 0x1e, + 0xb1, 0x91, 0x53, 0x3e, 0xf1, 0x02, 0x69, 0x04, 0x63, 0x59, 0x5a, 0x14, 0x9a, 0x5b, 0xa9, 0x4a, + 0xff, 0xd4, 0xf5, 0xff, 0x95, 0xe8, 0x15, 0x0c, 0xe4, 0xfa, 0xde, 0x1f, 0x44, 0x24, 0x9e, 0xb2, + 0xba, 0x9c, 0xff, 0x39, 0x85, 0xe1, 0xdb, 0xd2, 0xea, 0x1d, 0x7d, 0x0a, 0xcd, 0xaa, 0xf4, 0x2b, + 0xee, 0xdc, 0xee, 0x09, 0xf3, 0x9c, 0xf0, 0x01, 0x77, 0xf4, 0x15, 0x78, 0xba, 0x4d, 0xe1, 0xf6, + 0x8e, 0xaf, 0x1f, 0x27, 0xed, 0xc7, 0x92, 0x2e, 0x1e, 0xeb, 0x2d, 0xc7, 0xa0, 0x19, 0x37, 0x99, + 0x3b, 0x37, 0x69, 0x83, 0xbe, 0xe3, 0x26, 0xa3, 0x41, 0xbd, 0xcd, 0xa8, 0x7c, 0x8b, 0x6b, 0xff, + 0x2c, 0x22, 0xb1, 0xc7, 0xfa, 0x37, 0x5d, 0xc2, 0xa8, 0x07, 0xe3, 0x0f, 0xdd, 0xa9, 0x20, 0x69, + 0xd0, 0x25, 0x1d, 0xba, 0xe4, 0x73, 0xe7, 0x58, 0x7a, 0xfb, 0x1f, 0xb3, 0x93, 0x6f, 0x3f, 0x67, + 0x84, 0x1d, 0xc7, 0xe8, 0x73, 0x98, 0x6e, 0xa4, 0x96, 0xa5, 0x48, 0x79, 0x8e, 0xda, 0x1a, 0xff, + 0x3c, 0x1a, 0xc4, 0x67, 0x6c, 0xd2, 0x88, 0x37, 0x4e, 0xa3, 0x2f, 0xe1, 0x51, 0x77, 0xb4, 0xb3, + 0x5d, 0x38, 0xdb, 0x65, 0x27, 0xb7, 0xc6, 0xf7, 0x30, 0x5d, 0x4b, 0x53, 0x71, 0x7b, 0x9b, 0xa5, + 0xf5, 0x0d, 0xdf, 0x7b, 0x40, 0xaa, 0x49, 0x37, 0x5a, 0x37, 0xe7, 0x5b, 0x18, 0x7d, 0x44, 0x93, + 0x35, 0xc0, 0x5f, 0xc0, 0x10, 0xeb, 0xc2, 0xc1, 0x1e, 0x5f, 0x5f, 0xf6, 0x40, 0x5d, 0x9b, 0x35, + 0x4d, 0xfa, 0x06, 0x00, 0xef, 0x2b, 0xa9, 0xd1, 0xa4, 0xdc, 0xb6, 0xec, 0xff, 0x13, 0x48, 0x3b, + 0x77, 0x63, 0x97, 0x57, 0xfb, 0xdf, 0xe1, 0xc9, 0xfe, 0x10, 0x92, 0xef, 0x87, 0x90, 0xfc, 0x3a, + 0x84, 0x64, 0x75, 0xee, 0x46, 0x5f, 0xff, 0x0d, 0x00, 0x00, 0xff, 0xff, 0xbc, 0x92, 0xee, 0xef, + 0xac, 0x02, 0x00, 0x00, } func (m *Receiver) Marshal() (dAtA []byte, err error) { @@ -284,48 +287,56 @@ func (m *Entry) MarshalToSizedBuffer(dAtA []byte) (int, error) { i -= len(m.XXX_unrecognized) copy(dAtA[i:], m.XXX_unrecognized) } + n1, err1 := github_com_gogo_protobuf_types.StdTimeMarshalTo(m.DispatchTime, dAtA[i-github_com_gogo_protobuf_types.SizeOfStdTime(m.DispatchTime):]) + if err1 != nil { + return 0, err1 + } + i -= n1 + i = encodeVarintNflog(dAtA, i, uint64(n1)) + i-- + dAtA[i] = 0x42 if len(m.ResolvedAlerts) > 0 { - dAtA2 := make([]byte, len(m.ResolvedAlerts)*10) - var j1 int + dAtA3 := make([]byte, len(m.ResolvedAlerts)*10) + var j2 int for _, num := range m.ResolvedAlerts { for num >= 1<<7 { - dAtA2[j1] = uint8(uint64(num)&0x7f | 0x80) + dAtA3[j2] = uint8(uint64(num)&0x7f | 0x80) num >>= 7 - j1++ + j2++ } - dAtA2[j1] = uint8(num) - j1++ + dAtA3[j2] = uint8(num) + j2++ } - i -= j1 - copy(dAtA[i:], dAtA2[:j1]) - i = encodeVarintNflog(dAtA, i, uint64(j1)) + i -= j2 + copy(dAtA[i:], dAtA3[:j2]) + i = encodeVarintNflog(dAtA, i, uint64(j2)) i-- dAtA[i] = 0x3a } if len(m.FiringAlerts) > 0 { - dAtA4 := make([]byte, len(m.FiringAlerts)*10) - var j3 int + dAtA5 := make([]byte, len(m.FiringAlerts)*10) + var j4 int for _, num := range m.FiringAlerts { for num >= 1<<7 { - dAtA4[j3] = uint8(uint64(num)&0x7f | 0x80) + dAtA5[j4] = uint8(uint64(num)&0x7f | 0x80) num >>= 7 - j3++ + j4++ } - dAtA4[j3] = uint8(num) - j3++ + dAtA5[j4] = uint8(num) + j4++ } - i -= j3 - copy(dAtA[i:], dAtA4[:j3]) - i = encodeVarintNflog(dAtA, i, uint64(j3)) + i -= j4 + copy(dAtA[i:], dAtA5[:j4]) + i = encodeVarintNflog(dAtA, i, uint64(j4)) i-- dAtA[i] = 0x32 } - n5, err5 := github_com_gogo_protobuf_types.StdTimeMarshalTo(m.Timestamp, dAtA[i-github_com_gogo_protobuf_types.SizeOfStdTime(m.Timestamp):]) - if err5 != nil { - return 0, err5 + n6, err6 := github_com_gogo_protobuf_types.StdTimeMarshalTo(m.Timestamp, dAtA[i-github_com_gogo_protobuf_types.SizeOfStdTime(m.Timestamp):]) + if err6 != nil { + return 0, err6 } - i -= n5 - i = encodeVarintNflog(dAtA, i, uint64(n5)) + i -= n6 + i = encodeVarintNflog(dAtA, i, uint64(n6)) i-- dAtA[i] = 0x2a if m.Resolved { @@ -391,12 +402,12 @@ func (m *MeshEntry) MarshalToSizedBuffer(dAtA []byte) (int, error) { i -= len(m.XXX_unrecognized) copy(dAtA[i:], m.XXX_unrecognized) } - n7, err7 := github_com_gogo_protobuf_types.StdTimeMarshalTo(m.ExpiresAt, dAtA[i-github_com_gogo_protobuf_types.SizeOfStdTime(m.ExpiresAt):]) - if err7 != nil { - return 0, err7 + n8, err8 := github_com_gogo_protobuf_types.StdTimeMarshalTo(m.ExpiresAt, dAtA[i-github_com_gogo_protobuf_types.SizeOfStdTime(m.ExpiresAt):]) + if err8 != nil { + return 0, err8 } - i -= n7 - i = encodeVarintNflog(dAtA, i, uint64(n7)) + i -= n8 + i = encodeVarintNflog(dAtA, i, uint64(n8)) i-- dAtA[i] = 0x12 if m.Entry != nil { @@ -485,6 +496,8 @@ func (m *Entry) Size() (n int) { } n += 1 + sovNflog(uint64(l)) + l } + l = github_com_gogo_protobuf_types.SizeOfStdTime(m.DispatchTime) + n += 1 + l + sovNflog(uint64(l)) if m.XXX_unrecognized != nil { n += len(m.XXX_unrecognized) } @@ -987,6 +1000,39 @@ func (m *Entry) Unmarshal(dAtA []byte) error { } else { return fmt.Errorf("proto: wrong wireType = %d for field ResolvedAlerts", wireType) } + case 8: + if wireType != 2 { + return fmt.Errorf("proto: wrong wireType = %d for field DispatchTime", wireType) + } + var msglen int + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowNflog + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + msglen |= int(b&0x7F) << shift + if b < 0x80 { + break + } + } + if msglen < 0 { + return ErrInvalidLengthNflog + } + postIndex := iNdEx + msglen + if postIndex < 0 { + return ErrInvalidLengthNflog + } + if postIndex > l { + return io.ErrUnexpectedEOF + } + if err := github_com_gogo_protobuf_types.StdTimeUnmarshal(&m.DispatchTime, dAtA[iNdEx:postIndex]); err != nil { + return err + } + iNdEx = postIndex default: iNdEx = preIndex skippy, err := skipNflog(dAtA[iNdEx:]) diff --git a/nflog/nflogpb/nflog.proto b/nflog/nflogpb/nflog.proto index eb4fd8ba9e..4a39e4694d 100644 --- a/nflog/nflogpb/nflog.proto +++ b/nflog/nflogpb/nflog.proto @@ -39,6 +39,8 @@ message Entry { repeated uint64 firing_alerts = 6; // ResolvedAlerts list of hashes of resolved alerts at the last notification time. repeated uint64 resolved_alerts = 7; + // Timestamp of the last time the notifications started. + google.protobuf.Timestamp dispatch_time = 8 [(gogoproto.stdtime) = true, (gogoproto.nullable) = false]; } // MeshEntry is a wrapper message to communicate a notify log diff --git a/notify/notify.go b/notify/notify.go index d1065ab793..88af5ca0fd 100644 --- a/notify/notify.go +++ b/notify/notify.go @@ -112,6 +112,7 @@ type notifyKey int const ( keyReceiverName notifyKey = iota keyRepeatInterval + keyGroupInterval keyGroupLabels keyGroupKey keyFiringAlerts @@ -157,6 +158,11 @@ func WithRepeatInterval(ctx context.Context, t time.Duration) context.Context { return context.WithValue(ctx, keyRepeatInterval, t) } +// WithGroupInterval populates a context with a repeat interval. +func WithGroupInterval(ctx context.Context, t time.Duration) context.Context { + return context.WithValue(ctx, keyGroupInterval, t) +} + // WithMuteTimeIntervals populates a context with a slice of mute time names. func WithMuteTimeIntervals(ctx context.Context, mt []string) context.Context { return context.WithValue(ctx, keyMuteTimeIntervals, mt) @@ -177,6 +183,13 @@ func RepeatInterval(ctx context.Context) (time.Duration, bool) { return v, ok } +// GroupInterval extracts a group interval from the context. Iff none exists, the +// second argument is false. +func GroupInterval(ctx context.Context) (time.Duration, bool) { + v, ok := ctx.Value(keyGroupInterval).(time.Duration) + return v, ok +} + // ReceiverName extracts a receiver name from the context. Iff none exists, the // second argument is false. func ReceiverName(ctx context.Context) (string, bool) { @@ -254,7 +267,7 @@ func (f StageFunc) Exec(ctx context.Context, l log.Logger, alerts ...*types.Aler } type NotificationLog interface { - Log(r *nflogpb.Receiver, gkey string, firingAlerts, resolvedAlerts []uint64, expiry time.Duration) error + Log(r *nflogpb.Receiver, gkey string, firingAlerts, resolvedAlerts []uint64, expiry time.Duration, dispatchTime time.Time) error Query(params ...nflog.QueryParam) ([]*nflogpb.Entry, error) } @@ -664,14 +677,16 @@ func hashAlert(a *types.Alert) uint64 { return hash } -func (n *DedupStage) needsUpdate(entry *nflogpb.Entry, firing, resolved map[uint64]struct{}, repeat time.Duration) bool { +func (n *DedupStage) needsUpdate(entry *nflogpb.Entry, firing, resolved map[uint64]struct{}, dispatchTime time.Time, repeat, groupInterval time.Duration) bool { // If we haven't notified about the alert group before, notify right away // unless we only have resolved alerts. if entry == nil { return len(firing) > 0 } - if !entry.IsFiringSubset(firing) { + groupIntervalMuted := len(entry.FiringAlerts) > 0 && entry.DispatchTime.After(dispatchTime.Add(-groupInterval)) + + if !entry.IsFiringSubset(firing) && !groupIntervalMuted { return true } @@ -686,7 +701,7 @@ func (n *DedupStage) needsUpdate(entry *nflogpb.Entry, firing, resolved map[uint return len(entry.FiringAlerts) > 0 } - if n.rs.SendResolved() && !entry.IsResolvedSubset(resolved) { + if n.rs.SendResolved() && !entry.IsResolvedSubset(resolved) && !groupIntervalMuted { return true } @@ -705,6 +720,14 @@ func (n *DedupStage) Exec(ctx context.Context, _ log.Logger, alerts ...*types.Al if !ok { return ctx, nil, errors.New("repeat interval missing") } + groupInterval, ok := GroupInterval(ctx) + if !ok { + return ctx, nil, errors.New("group interval missing") + } + now, ok := Now(ctx) + if !ok { + return ctx, nil, errors.New("dispatch time missing") + } firingSet := map[uint64]struct{}{} resolvedSet := map[uint64]struct{}{} @@ -740,7 +763,7 @@ func (n *DedupStage) Exec(ctx context.Context, _ log.Logger, alerts ...*types.Al return ctx, nil, fmt.Errorf("unexpected entry result size %d", len(entries)) } - if n.needsUpdate(entry, firingSet, resolvedSet, repeatInterval) { + if n.needsUpdate(entry, firingSet, resolvedSet, now, repeatInterval, groupInterval) { return ctx, alerts, nil } return ctx, nil, nil @@ -931,7 +954,12 @@ func (n SetNotifiesStage) Exec(ctx context.Context, l log.Logger, alerts ...*typ } expiry := 2 * repeat - return ctx, alerts, n.nflog.Log(n.recv, gkey, firing, resolved, expiry) + now, ok := Now(ctx) + if !ok { + return ctx, nil, errors.New("now time missing") + } + + return ctx, alerts, n.nflog.Log(n.recv, gkey, firing, resolved, expiry, now) } type timeStage struct { diff --git a/notify/notify_test.go b/notify/notify_test.go index 5c2297e993..1446d9ee35 100644 --- a/notify/notify_test.go +++ b/notify/notify_test.go @@ -68,7 +68,7 @@ func (l *testNflog) Query(p ...nflog.QueryParam) ([]*nflogpb.Entry, error) { return l.qres, l.qerr } -func (l *testNflog) Log(r *nflogpb.Receiver, gkey string, firingAlerts, resolvedAlerts []uint64, expiry time.Duration) error { +func (l *testNflog) Log(r *nflogpb.Receiver, gkey string, firingAlerts, resolvedAlerts []uint64, expiry time.Duration, dispatchTime time.Time) error { return l.logFunc(r, gkey, firingAlerts, resolvedAlerts, expiry) } @@ -210,7 +210,7 @@ func TestDedupStageNeedsUpdate(t *testing.T) { now: func() time.Time { return now }, rs: sendResolved(c.resolve), } - res := s.needsUpdate(c.entry, c.firingAlerts, c.resolvedAlerts, c.repeat) + res := s.needsUpdate(c.entry, c.firingAlerts, c.resolvedAlerts, now, c.repeat, time.Second*0) require.Equal(t, c.res, res) } } @@ -242,6 +242,16 @@ func TestDedupStage(t *testing.T) { ctx = WithRepeatInterval(ctx, time.Hour) + _, _, err = s.Exec(ctx, log.NewNopLogger()) + require.EqualError(t, err, "group interval missing") + + ctx = WithGroupInterval(ctx, time.Second*0) + + _, _, err = s.Exec(ctx, log.NewNopLogger()) + require.EqualError(t, err, "dispatch time missing") + + ctx = WithNow(ctx, now) + alerts := []*types.Alert{{}, {}, {}} // Must catch notification log query errors. @@ -629,6 +639,7 @@ func TestSetNotifiesStage(t *testing.T) { ctx = WithResolvedAlerts(ctx, []uint64{}) ctx = WithRepeatInterval(ctx, time.Hour) + ctx = WithNow(ctx, time.Now()) tnflog.logFunc = func(r *nflogpb.Receiver, gkey string, firingAlerts, resolvedAlerts []uint64, expiry time.Duration) error { require.Equal(t, s.recv, r) diff --git a/test/cli/acceptance/cli_test.go b/test/cli/acceptance/cli_test.go index 93afe7884f..7367bdf6ad 100644 --- a/test/cli/acceptance/cli_test.go +++ b/test/cli/acceptance/cli_test.go @@ -271,3 +271,46 @@ receivers: require.EqualError(t, err, "exit status 1") require.Equal(t, "amtool: error: Failed to parse labels: unexpected open or close brace: {foo=bar}\n\n", string(out)) } + +func TestGroupingOnConfigReload(t *testing.T) { + t.Parallel() + + conf := ` +route: + receiver: "default" + group_by: [alertname] + group_wait: 2s + group_interval: 1h + repeat_interval: 4h + +receivers: +- name: "default" + webhook_configs: + - url: 'http://%s' + send_resolved: true +` + + at := NewAcceptanceTest(t, &AcceptanceOpts{ + Tolerance: 150 * time.Millisecond, + }) + co := at.Collector("webhook") + wh := NewWebhook(co) + + amc := at.AlertmanagerCluster(fmt.Sprintf(conf, wh.Address()), 1) + am := amc.Members()[0] + + alert1 := Alert("alertname", "test1", "tag", "one").Active(1, 400) + am.AddAlertsAt(false, 0, alert1) + co.Want(Between(1, 3), Alert("alertname", "test1", "tag", "one").Active(1)) + + alert2 := Alert("alertname", "test1", "tag", "two").Active(4, 402) + am.AddAlertsAt(false, 4, alert2) + co.Want(Between(4, 8)) + + // Force a config re-load + at.Do(5, func() { amc.Reload() }) + + at.Run() + + t.Log(co.Check()) +}