From b7f26d5b10226aadd70bdf89b60ae514b830edf7 Mon Sep 17 00:00:00 2001 From: wanjunlei Date: Mon, 6 Feb 2023 17:40:24 +0800 Subject: [PATCH] Optimize the logic of notification history Signed-off-by: wanjunlei --- config/samples/bundle.yaml | 2 +- config/samples/template.yaml | 2 +- .../{post-install.yaml => hooks.yaml} | 11 ++-- helm/templates/template.yaml | 2 +- helm/values.yaml | 4 ++ pkg/dispatcher/dispatcher.go | 14 ++++- pkg/history/history.go | 22 ++------ pkg/notify/notifier/dingtalk/dingtalk.go | 50 ++++++++++++----- pkg/notify/notifier/discord/discord.go | 25 +++++++-- pkg/notify/notifier/email/email.go | 15 ++++- pkg/notify/notifier/feishu/feishu.go | 22 +++++++- pkg/notify/notifier/interface.go | 1 + pkg/notify/notifier/pushover/pushover.go | 22 ++++++-- pkg/notify/notifier/slack/slack.go | 14 ++++- pkg/notify/notifier/sms/sms.go | 13 ++++- pkg/notify/notifier/webhook/webhook.go | 11 +++- pkg/notify/notifier/wechat/wechat.go | 39 ++++++++++--- pkg/notify/notify.go | 30 ++++++++-- pkg/route/router.go | 12 ++-- pkg/silence/silence.go | 15 +++-- pkg/stage/{interface.go => stage.go} | 2 +- pkg/template/template.go | 55 ++++++++++++------- pkg/template/types.go | 10 +++- pkg/webhook/v1/handler.go | 1 + 24 files changed, 285 insertions(+), 109 deletions(-) rename helm/templates/{post-install.yaml => hooks.yaml} (81%) rename pkg/stage/{interface.go => stage.go} (96%) diff --git a/config/samples/bundle.yaml b/config/samples/bundle.yaml index b780a0a5..737cdc3c 100644 --- a/config/samples/bundle.yaml +++ b/config/samples/bundle.yaml @@ -4,7 +4,7 @@ data: {{ define "nm.default.message" }}{{ if ne (len .Status) 0 }}[{{ .Status | translate }}] {{ end }}{{ . | message }}{{ end }} {{ define "nm.default.message.cn" }}{{ if ne (len .Status) 0 }}[{{ .Status | translate }}] {{ end }}{{ .MessageCN }}{{ end }} - {{ define "nm.default.subject" }}{{ if eq (len .Alerts) 1 }}{{ range .Alerts }}{{ template "nm.default.message" . }}{{ end }}{{ else }}{{ .Alerts | len }} {{ .Status }} alerts{{ if gt (len .GroupLabels.SortedPairs) 1 }} for {{ range .GroupLabels.SortedPairs }}{{ .Name | translate }}={{ .Value }} {{ end }}{{ end }}{{ end }}{{ end }} + {{ define "nm.default.subject" }}{{ if eq (len .Alerts) 1 }}{{ range .Alerts }}{{ template "nm.default.message" . }}{{ end }}{{ else }}{{ .Alerts | len }} {{ if ne (len .Status) 0 }}{{ .Status }} {{ end }}alerts{{ if gt (len .GroupLabels.SortedPairs) 1 }} for {{ range .GroupLabels.SortedPairs }}{{ .Name | translate }}={{ .Value }} {{ end }}{{ end }}{{ end }}{{ end }} {{ define "nm.default.text" }}{{ range .Alerts }}{{ template "nm.default.message" . }} {{ range .Labels.SortedPairs }} {{ .Name | translate }}: {{ .Value }} diff --git a/config/samples/template.yaml b/config/samples/template.yaml index 9a773e72..86d397f8 100644 --- a/config/samples/template.yaml +++ b/config/samples/template.yaml @@ -4,7 +4,7 @@ data: {{ define "nm.default.message" }}{{ if ne (len .Status) 0 }}[{{ .Status | translate }}] {{ end }}{{ . | message }}{{ end }} {{ define "nm.default.message.cn" }}{{ if ne (len .Status) 0 }}[{{ .Status | translate }}] {{ end }}{{ .MessageCN }}{{ end }} - {{ define "nm.default.subject" }}{{ if eq (len .Alerts) 1 }}{{ range .Alerts }}{{ template "nm.default.message" . }}{{ end }}{{ else }}{{ .Alerts | len }} {{ .Status }} alerts{{ if gt (len .GroupLabels.SortedPairs) 1 }} for {{ range .GroupLabels.SortedPairs }}{{ .Name | translate }}={{ .Value }} {{ end }}{{ end }}{{ end }}{{ end }} + {{ define "nm.default.subject" }}{{ if eq (len .Alerts) 1 }}{{ range .Alerts }}{{ template "nm.default.message" . }}{{ end }}{{ else }}{{ .Alerts | len }} {{ if ne (len .Status) 0 }}{{ .Status }} {{ end }}alerts{{ if gt (len .GroupLabels.SortedPairs) 1 }} for {{ range .GroupLabels.SortedPairs }}{{ .Name | translate }}={{ .Value }} {{ end }}{{ end }}{{ end }}{{ end }} {{ define "nm.default.text" }}{{ range .Alerts }}{{ template "nm.default.message" . }} {{ range .Labels.SortedPairs }} {{ .Name | translate }}: {{ .Value }} diff --git a/helm/templates/post-install.yaml b/helm/templates/hooks.yaml similarity index 81% rename from helm/templates/post-install.yaml rename to helm/templates/hooks.yaml index 7852a230..e8c01188 100644 --- a/helm/templates/post-install.yaml +++ b/helm/templates/hooks.yaml @@ -5,7 +5,7 @@ metadata: annotations: "helm.sh/hook": post-install "helm.sh/hook-weight": "1" - "helm.sh/hook-delete-policy": hook-succeeded,hook-failed + "helm.sh/hook-delete-policy": hook-succeeded,hook-failed,before-hook-creation --- apiVersion: rbac.authorization.k8s.io/v1 @@ -15,7 +15,7 @@ metadata: annotations: "helm.sh/hook": post-install "helm.sh/hook-weight": "2" - "helm.sh/hook-delete-policy": hook-succeeded,hook-failed + "helm.sh/hook-delete-policy": hook-succeeded,hook-failed,before-hook-creation rules: - apiGroups: - apiextensions.k8s.io @@ -38,7 +38,7 @@ metadata: annotations: "helm.sh/hook": post-install "helm.sh/hook-weight": "3" - "helm.sh/hook-delete-policy": hook-succeeded,hook-failed + "helm.sh/hook-delete-policy": hook-succeeded,hook-failed,before-hook-creation subjects: - kind: ServiceAccount name: {{ .Release.Name }}-post-install @@ -56,7 +56,7 @@ metadata: annotations: "helm.sh/hook": post-install "helm.sh/hook-weight": "4" - "helm.sh/hook-delete-policy": hook-succeeded,hook-failed + "helm.sh/hook-delete-policy": hook-succeeded,hook-failed,before-hook-creation spec: backoffLimit: {{ .Values.hook.postInstall.backoffLimit }} template: @@ -67,7 +67,8 @@ spec: serviceAccountName: {{ .Release.Name }}-post-install containers: - name: post-install-job - image: "bitnami/kubectl:1.23.6" + image: {{ .Values.hook.postInstall.image.repo }}:{{ .Values.hook.postInstall.image.tag }} + imagePullPolicy: {{ .Values.hook.postInstall.image.pullPolicy }} command: - /bin/sh - -c diff --git a/helm/templates/template.yaml b/helm/templates/template.yaml index 8fbf256f..40577d1a 100644 --- a/helm/templates/template.yaml +++ b/helm/templates/template.yaml @@ -4,7 +4,7 @@ data: {{ "{{ define \"nm.default.message\" }}{{ if ne (len .Status) 0 }}[{{ .Status | translate }}] {{ end }}{{ . | message }}{{ end }}" }} {{ "{{ define \"nm.default.message.cn\" }}{{ if ne (len .Status) 0 }}[{{ .Status | translate }}] {{ end }}{{ .MessageCN }}{{ end }}" }} - {{ "{{ define \"nm.default.subject\" }}{{ if eq (len .Alerts) 1 }}{{ range .Alerts }}{{ template \"nm.default.message\" . }}{{ end }}{{ else }}{{ .Alerts | len }} {{ .Status }} alerts{{ if gt (len .GroupLabels.SortedPairs) 1 }} for {{ range .GroupLabels.SortedPairs }}{{ .Name | translate }}={{ .Value }} {{ end }}{{ end }}{{ end }}{{ end }}" }} + {{ "{{ define \"nm.default.subject\" }}{{ if eq (len .Alerts) 1 }}{{ range .Alerts }}{{ template \"nm.default.message\" . }}{{ end }}{{ else }}{{ .Alerts | len }} {{ if ne (len .Status) 0 }}{{ .Status }} {{ end }}alerts{{ if gt (len .GroupLabels.SortedPairs) 1 }} for {{ range .GroupLabels.SortedPairs }}{{ .Name | translate }}={{ .Value }} {{ end }}{{ end }}{{ end }}{{ end }}" }} {{ "{{ define \"nm.default.text\" }}{{ range .Alerts }}{{ template \"nm.default.message\" . }}" }} {{ "{{ range .Labels.SortedPairs }} {{ .Name | translate }}: {{ .Value }}" }} diff --git a/helm/values.yaml b/helm/values.yaml index e721230f..9be72d49 100644 --- a/helm/values.yaml +++ b/helm/values.yaml @@ -3,6 +3,10 @@ kubesphere: false hook: postInstall: + image: + repo: kubesphere/kubectl + tag: v1.22.0 + pullPolicy: IfNotPresent backoffLimit: 1 # Set timezone to be injected into containers diff --git a/pkg/dispatcher/dispatcher.go b/pkg/dispatcher/dispatcher.go index a4f58025..9e988587 100644 --- a/pkg/dispatcher/dispatcher.go +++ b/pkg/dispatcher/dispatcher.go @@ -122,12 +122,20 @@ func (d *Dispatcher) worker(ctx context.Context, data interface{}, stopCh chan s pipeline = append(pipeline, aggregation.NewStage(d.notifierCtl)) // Notify stage pipeline = append(pipeline, notify.NewStage(d.notifierCtl)) - // History stage - pipeline = append(pipeline, history.NewStage(d.notifierCtl)) - if _, _, err := pipeline.Exec(ctx, d.l, data); err != nil { + _, output, err := pipeline.Exec(ctx, d.l, data) + if err != nil { _ = level.Error(d.l).Log("msg", "Dispatcher: process alerts failed", "seq", ctx.Value("seq")) } + go d.execHistoryStage(ctx.Value("seq"), output) + stopCh <- struct{}{} } + +func (d *Dispatcher) execHistoryStage(seq, input interface{}) { + s := history.NewStage(d.notifierCtl) + if _, _, err := s.Exec(context.WithValue(context.Background(), "seq", seq), d.l, input); err != nil { + _ = level.Error(d.l).Log("msg", "Dispatcher: exec history stage failed", "seq", seq) + } +} diff --git a/pkg/history/history.go b/pkg/history/history.go index c185e17b..8c776817 100644 --- a/pkg/history/history.go +++ b/pkg/history/history.go @@ -11,7 +11,6 @@ import ( "github.com/kubesphere/notification-manager/pkg/notify" "github.com/kubesphere/notification-manager/pkg/stage" "github.com/kubesphere/notification-manager/pkg/template" - "github.com/kubesphere/notification-manager/pkg/utils" "github.com/modern-go/reflect2" ) @@ -31,7 +30,6 @@ func NewStage(notifierCtl *controller.Controller) stage.Stage { } func (s *historyStage) Exec(ctx context.Context, l log.Logger, data interface{}) (context.Context, interface{}, error) { - if reflect2.IsNil(data) { return ctx, nil, nil } @@ -43,27 +41,19 @@ func (s *historyStage) Exec(ctx context.Context, l log.Logger, data interface{}) _ = level.Debug(l).Log("msg", "Start history stage", "seq", ctx.Value("seq")) - alertMap := data.(map[internal.Receiver][]*template.Data) - m := make(map[string]*template.Alert) - for _, v := range alertMap { - for _, d := range v { - for _, alert := range d.Alerts { - hash := utils.Hash(alert) - m[hash] = alert - } - } - } - + input := data.(map[string]*template.Alert) d := &template.Data{} - for _, v := range m { - d.Alerts = append(d.Alerts, v) + for _, alert := range input { + if alert.NotifySuccessful { + d.Alerts = append(d.Alerts, alert) + } } if len(d.Alerts) == 0 { return ctx, nil, nil } - alertMap = make(map[internal.Receiver][]*template.Data) + alertMap := make(map[internal.Receiver][]*template.Data) for _, receiver := range receivers { alertMap[receiver] = []*template.Data{d} } diff --git a/pkg/notify/notifier/dingtalk/dingtalk.go b/pkg/notify/notifier/dingtalk/dingtalk.go index 3e47b4dc..bf79994b 100644 --- a/pkg/notify/notifier/dingtalk/dingtalk.go +++ b/pkg/notify/notifier/dingtalk/dingtalk.go @@ -57,6 +57,8 @@ type Notifier struct { conversationThreshold int conversationUnit time.Duration conversationMaxWaitTime time.Duration + + sentSuccessfulHandler *func([]*template.Alert) } type dingtalkText struct { @@ -216,6 +218,10 @@ func NewDingTalkNotifier(logger log.Logger, receiver internal.Receiver, notifier return n, nil } +func (n *Notifier) SetSentSuccessfulHandler(h *func([]*template.Alert)) { + n.sentSuccessfulHandler = h +} + func (n *Notifier) Notify(ctx context.Context, data *template.Data) error { group := async.NewGroup(ctx) @@ -357,27 +363,35 @@ func (n *Notifier) sendToChatBot(ctx context.Context, data *template.Data) error maxSize = maxSize - len(atMobiles) } - messages, titles, err := n.tmpl.Split(data, n.chatbotMessageMaxSize-len(keywords)-len(atMobiles), n.receiver.TmplName, n.receiver.TitleTmplName, n.logger) + splitData, err := n.tmpl.Split(data, n.chatbotMessageMaxSize-len(keywords)-len(atMobiles), n.receiver.TmplName, n.receiver.TitleTmplName, n.logger) if err != nil { _ = level.Error(n.logger).Log("msg", "DingTalkNotifier: split message error", "error", err.Error()) return err } group := async.NewGroup(ctx) - for index := range messages { - title := titles[index] - msg := fmt.Sprintf("%s%s", messages[index], keywords) + for index := range splitData { + d := splitData[index] + alerts := d.Alerts + title := d.Title + msg := fmt.Sprintf("%s%s", d.Message, keywords) if n.receiver.TmplType == constants.Markdown { msg = fmt.Sprintf("%s %s", msg, atMobiles) } group.Add(func(stopCh chan interface{}) { n.throttle.TryAdd(webhook, n.chatbotThreshold, n.chatbotUnit, n.chatbotMaxWaitTime) - if n.throttle.Allow(webhook, n.logger) { - stopCh <- send(title, msg) - } else { + if !n.throttle.Allow(webhook, n.logger) { _ = level.Error(n.logger).Log("msg", "DingTalkNotifier: message to chatbot dropped because of flow control") stopCh <- utils.Error("") } + + err := send(title, msg) + if err == nil { + if n.sentSuccessfulHandler != nil { + (*n.sentSuccessfulHandler)(alerts) + } + } + stopCh <- err }) } @@ -486,26 +500,34 @@ func (n *Notifier) sendToConversation(ctx context.Context, data *template.Data) return nil } - messages, titles, err := n.tmpl.Split(data, n.conversationMessageMaxSize, n.receiver.TmplName, n.receiver.TitleTmplName, n.logger) + splitData, err := n.tmpl.Split(data, n.conversationMessageMaxSize, n.receiver.TmplName, n.receiver.TitleTmplName, n.logger) if err != nil { _ = level.Error(n.logger).Log("msg", "DingTalkNotifier: split message error", "error", err.Error()) return nil } group := async.NewGroup(ctx) - for index := range messages { - title := titles[index] - msg := messages[index] + for index := range splitData { + d := splitData[index] + alerts := d.Alerts + title := d.Title + msg := d.Message for _, chatID := range n.receiver.ChatIDs { id := chatID group.Add(func(stopCh chan interface{}) { n.throttle.TryAdd(appkey, n.conversationThreshold, n.conversationUnit, n.conversationMaxWaitTime) - if n.throttle.Allow(appkey, n.logger) { - stopCh <- send(id, title, msg) - } else { + if !n.throttle.Allow(appkey, n.logger) { _ = level.Error(n.logger).Log("msg", "DingTalkNotifier: message to conversation dropped because of flow control", "conversation", chatID) stopCh <- utils.Error("") } + + err := send(id, title, msg) + if err == nil { + if n.sentSuccessfulHandler != nil { + (*n.sentSuccessfulHandler)(alerts) + } + } + stopCh <- err }) } } diff --git a/pkg/notify/notifier/discord/discord.go b/pkg/notify/notifier/discord/discord.go index be39f84d..93b1128d 100644 --- a/pkg/notify/notifier/discord/discord.go +++ b/pkg/notify/notifier/discord/discord.go @@ -32,6 +32,8 @@ type Notifier struct { timeout time.Duration logger log.Logger tmpl *template.Template + + sentSuccessfulHandler *func([]*template.Alert) } type Message struct { Content string `json:"content"` @@ -89,6 +91,10 @@ func NewDiscordNotifier(logger log.Logger, receiver internal.Receiver, notifierC return n, nil } +func (n *Notifier) SetSentSuccessfulHandler(h *func([]*template.Alert)) { + n.sentSuccessfulHandler = h +} + func (n *Notifier) Notify(ctx context.Context, data *template.Data) error { mentionedUsers := n.receiver.MentionedUsers @@ -113,7 +119,7 @@ func (n *Notifier) Notify(ctx context.Context, data *template.Data) error { } else { length = EmbedLimit - len(atUsers) - len(atRoles) } - messages, _, err := n.tmpl.Split(data, length, n.receiver.TmplName, "", n.logger) + splitData, err := n.tmpl.Split(data, length, n.receiver.TmplName, "", n.logger) if err != nil { _ = level.Error(n.logger).Log("msg", "DiscordNotifier: generate message error", "error", err.Error()) return err @@ -121,10 +127,19 @@ func (n *Notifier) Notify(ctx context.Context, data *template.Data) error { group := async.NewGroup(ctx) if n.receiver.Webhook != nil { - for index := range messages { + for index := range splitData { + d := splitData[index] + alerts := d.Alerts + msg := d.Message group.Add(func(stopCh chan interface{}) { - msg := fmt.Sprintf("%s\n%s%s", messages[index], atUsers, atRoles) - stopCh <- n.sendTo(ctx, msg) + msg := fmt.Sprintf("%s\n%s%s", msg, atUsers, atRoles) + err := n.sendTo(ctx, msg) + if err == nil { + if n.sentSuccessfulHandler != nil { + (*n.sentSuccessfulHandler)(alerts) + } + } + stopCh <- err }) } } @@ -165,7 +180,7 @@ func (n *Notifier) sendTo(ctx context.Context, content string) error { return true, err } code := resp.StatusCode - level.Debug(n.logger).Log("msg", "DiscordNotifier", "response code:", code) + _ = level.Debug(n.logger).Log("msg", "DiscordNotifier", "response code:", code) if code != http.StatusNoContent { return false, fmt.Errorf("DiscordNotifier: send message error, code: %d", code) diff --git a/pkg/notify/notifier/email/email.go b/pkg/notify/notifier/email/email.go index 3e2384bd..2d93160b 100644 --- a/pkg/notify/notifier/email/email.go +++ b/pkg/notify/notifier/email/email.go @@ -49,6 +49,8 @@ type Notifier struct { delivery string // The maximum size of receivers in one email. maxEmailReceivers int + + sentSuccessfulHandler *func([]*template.Alert) } func NewEmailNotifier(logger log.Logger, receiver internal.Receiver, notifierCtl *controller.Controller) (notifier.Notifier, error) { @@ -126,6 +128,10 @@ func NewEmailNotifier(logger log.Logger, receiver internal.Receiver, notifierCtl return n, nil } +func (n *Notifier) SetSentSuccessfulHandler(h *func([]*template.Alert)) { + n.sentSuccessfulHandler = h +} + func (n *Notifier) Notify(ctx context.Context, data *template.Data) error { sendEmail := func(to, subject, body string) error { @@ -165,7 +171,14 @@ func (n *Notifier) Notify(ctx context.Context, data *template.Data) error { for _, t := range n.receiver.To { to := t group.Add(func(stopCh chan interface{}) { - stopCh <- sendEmail(to, subject, body) + err := sendEmail(to, subject, body) + if err == nil { + if n.sentSuccessfulHandler != nil { + (*n.sentSuccessfulHandler)(data.Alerts) + } + } + stopCh <- err + }) } diff --git a/pkg/notify/notifier/feishu/feishu.go b/pkg/notify/notifier/feishu/feishu.go index abfb5414..f88d7e6a 100644 --- a/pkg/notify/notifier/feishu/feishu.go +++ b/pkg/notify/notifier/feishu/feishu.go @@ -42,6 +42,8 @@ type Notifier struct { tmpl *template.Template ats *notifier.AccessTokenService tokenExpires time.Duration + + sentSuccessfulHandler *func([]*template.Alert) } type Message struct { @@ -134,6 +136,10 @@ func NewFeishuNotifier(logger log.Logger, receiver internal.Receiver, notifierCt return n, nil } +func (n *Notifier) SetSentSuccessfulHandler(h *func([]*template.Alert)) { + n.sentSuccessfulHandler = h +} + func (n *Notifier) Notify(ctx context.Context, data *template.Data) error { content, err := n.tmpl.Text(n.receiver.TmplName, data) @@ -145,13 +151,25 @@ func (n *Notifier) Notify(ctx context.Context, data *template.Data) error { group := async.NewGroup(ctx) if n.receiver.ChatBot != nil { group.Add(func(stopCh chan interface{}) { - stopCh <- n.sendToChatBot(ctx, content) + err := n.sendToChatBot(ctx, content) + if err == nil { + if n.sentSuccessfulHandler != nil { + (*n.sentSuccessfulHandler)(data.Alerts) + } + } + stopCh <- err }) } if len(n.receiver.User) > 0 || len(n.receiver.Department) > 0 { group.Add(func(stopCh chan interface{}) { - stopCh <- n.batchSend(ctx, content) + err := n.batchSend(ctx, content) + if err == nil { + if n.sentSuccessfulHandler != nil { + (*n.sentSuccessfulHandler)(data.Alerts) + } + } + stopCh <- err }) } diff --git a/pkg/notify/notifier/interface.go b/pkg/notify/notifier/interface.go index a16fde22..53e0b2f5 100644 --- a/pkg/notify/notifier/interface.go +++ b/pkg/notify/notifier/interface.go @@ -8,4 +8,5 @@ import ( type Notifier interface { Notify(ctx context.Context, data *template.Data) error + SetSentSuccessfulHandler(*func([]*template.Alert)) } diff --git a/pkg/notify/notifier/pushover/pushover.go b/pkg/notify/notifier/pushover/pushover.go index acfef924..f112ce03 100644 --- a/pkg/notify/notifier/pushover/pushover.go +++ b/pkg/notify/notifier/pushover/pushover.go @@ -37,6 +37,8 @@ type Notifier struct { timeout time.Duration logger log.Logger tmpl *template.Template + + sentSuccessfulHandler *func([]*template.Alert) } // Pushover message struct @@ -120,6 +122,10 @@ func NewPushoverNotifier(logger log.Logger, receiver internal.Receiver, notifier return n, nil } +func (n *Notifier) SetSentSuccessfulHandler(h *func([]*template.Alert)) { + n.sentSuccessfulHandler = h +} + // Notify sends messages to Pushover server. // The logic to preprocess the pushover notification messages are as below: // - The alert messages are filtered by AlertSelector first before sending to Pushover. @@ -226,19 +232,27 @@ func (n *Notifier) Notify(ctx context.Context, data *template.Data) error { } // split new data along with its Alerts to ensure each message is small enough to fit the Pushover's message length limit - messages, _, err := n.tmpl.Split(data, MessageMaxLength, n.receiver.TmplName, "", n.logger) + splitData, err := n.tmpl.Split(data, MessageMaxLength, n.receiver.TmplName, "", n.logger) if err != nil { _ = level.Error(n.logger).Log("msg", "PushoverNotifier: split alerts error", "error", err.Error()) return err } group := async.NewGroup(ctx) - for _, m := range messages { - message := m + for index := range splitData { + d := splitData[index] + alerts := d.Alerts + message := d.Message for _, p := range n.receiver.Profiles { profile := p group.Add(func(stopCh chan interface{}) { - stopCh <- send(profile, title, message) + err := send(profile, title, message) + if err == nil { + if n.sentSuccessfulHandler != nil { + (*n.sentSuccessfulHandler)(alerts) + } + } + stopCh <- err }) } } diff --git a/pkg/notify/notifier/slack/slack.go b/pkg/notify/notifier/slack/slack.go index feaa1b0a..3d9e0c83 100644 --- a/pkg/notify/notifier/slack/slack.go +++ b/pkg/notify/notifier/slack/slack.go @@ -29,6 +29,8 @@ type Notifier struct { timeout time.Duration logger log.Logger tmpl *template.Template + + sentSuccessfulHandler *func([]*template.Alert) } type slackRequest struct { @@ -86,6 +88,10 @@ func NewSlackNotifier(logger log.Logger, receiver internal.Receiver, notifierCtl return n, nil } +func (n *Notifier) SetSentSuccessfulHandler(h *func([]*template.Alert)) { + n.sentSuccessfulHandler = h +} + func (n *Notifier) Notify(ctx context.Context, data *template.Data) error { msg, err := n.tmpl.Text(n.receiver.TmplName, data) @@ -151,7 +157,13 @@ func (n *Notifier) Notify(ctx context.Context, data *template.Data) error { for _, channel := range n.receiver.Channels { ch := channel group.Add(func(stopCh chan interface{}) { - stopCh <- send(ch) + err := send(ch) + if err == nil { + if n.sentSuccessfulHandler != nil { + (*n.sentSuccessfulHandler)(data.Alerts) + } + } + stopCh <- err }) } diff --git a/pkg/notify/notifier/sms/sms.go b/pkg/notify/notifier/sms/sms.go index a7c8630a..11f10016 100644 --- a/pkg/notify/notifier/sms/sms.go +++ b/pkg/notify/notifier/sms/sms.go @@ -27,6 +27,8 @@ type Notifier struct { timeout time.Duration logger log.Logger tmpl *template.Template + + sentSuccessfulHandler *func([]*template.Alert) } func NewSmsNotifier(logger log.Logger, receiver internal.Receiver, notifierCtl *controller.Controller) (notifier.Notifier, error) { @@ -74,6 +76,10 @@ func NewSmsNotifier(logger log.Logger, receiver internal.Receiver, notifierCtl * return n, nil } +func (n *Notifier) SetSentSuccessfulHandler(h *func([]*template.Alert)) { + n.sentSuccessfulHandler = h +} + func (n *Notifier) Notify(ctx context.Context, data *template.Data) error { start := time.Now() @@ -105,10 +111,13 @@ func (n *Notifier) Notify(ctx context.Context, data *template.Data) error { _ = level.Error(n.logger).Log("msg", "SmsNotifier: send request failed", "error", err.Error()) return err } - _ = level.Info(n.logger).Log("msg", "SmsNotifier: send request successfully") - return nil + if n.sentSuccessfulHandler != nil { + (*n.sentSuccessfulHandler)(data.Alerts) + } + _ = level.Info(n.logger).Log("msg", "SmsNotifier: send request successfully") + return nil } func stringValue(a *string) string { diff --git a/pkg/notify/notifier/webhook/webhook.go b/pkg/notify/notifier/webhook/webhook.go index db0d2b7e..c2c9a7d8 100644 --- a/pkg/notify/notifier/webhook/webhook.go +++ b/pkg/notify/notifier/webhook/webhook.go @@ -32,6 +32,8 @@ type Notifier struct { timeout time.Duration logger log.Logger tmpl *template.Template + + sentSuccessfulHandler *func([]*template.Alert) } func NewWebhookNotifier(logger log.Logger, receiver internal.Receiver, notifierCtl *controller.Controller) (notifier.Notifier, error) { @@ -74,6 +76,10 @@ func NewWebhookNotifier(logger log.Logger, receiver internal.Receiver, notifierC return n, nil } +func (n *Notifier) SetSentSuccessfulHandler(h *func([]*template.Alert)) { + n.sentSuccessfulHandler = h +} + func (n *Notifier) Notify(ctx context.Context, data *template.Data) error { start := time.Now() @@ -144,8 +150,11 @@ func (n *Notifier) Notify(ctx context.Context, data *template.Data) error { return err } - _ = level.Debug(n.logger).Log("msg", "WebhookNotifier: send message", "to", n.receiver.URL) + if n.sentSuccessfulHandler != nil { + (*n.sentSuccessfulHandler)(data.Alerts) + } + _ = level.Debug(n.logger).Log("msg", "WebhookNotifier: send message", "to", n.receiver.URL) return nil } diff --git a/pkg/notify/notifier/wechat/wechat.go b/pkg/notify/notifier/wechat/wechat.go index acf6cc87..52311b9f 100644 --- a/pkg/notify/notifier/wechat/wechat.go +++ b/pkg/notify/notifier/wechat/wechat.go @@ -45,6 +45,8 @@ type Notifier struct { ats *notifier.AccessTokenService messageMaxSize int tokenExpires time.Duration + + sentSuccessfulHandler *func([]*template.Alert) } type weChatMessageContent struct { @@ -151,6 +153,10 @@ func NewWechatNotifier(logger log.Logger, receiver internal.Receiver, notifierCt return n, nil } +func (n *Notifier) SetSentSuccessfulHandler(h *func([]*template.Alert)) { + n.sentSuccessfulHandler = h +} + func (n *Notifier) Notify(ctx context.Context, data *template.Data) error { send := func(r *wechat.Receiver, msg string) error { @@ -276,7 +282,7 @@ func (n *Notifier) Notify(ctx context.Context, data *template.Data) error { return err } - messages, _, err := n.tmpl.Split(data, MessageMaxSize, n.receiver.TmplName, "", n.logger) + splitData, err := n.tmpl.Split(data, MessageMaxSize, n.receiver.TmplName, "", n.logger) if err != nil { _ = level.Error(n.logger).Log("msg", "WechatNotifier: split message error", "error", err.Error()) return nil @@ -289,7 +295,6 @@ func (n *Notifier) Notify(ctx context.Context, data *template.Data) error { group := async.NewGroup(ctx) if n.receiver.ChatBot != nil { - group.Add(func(stopCh chan interface{}) { stopCh <- n.sendToChatBot(ctx, data) }) @@ -304,10 +309,18 @@ func (n *Notifier) Notify(ctx context.Context, data *template.Data) error { r.ToParty = batch(toParty, &ps, ToPartyBatchSize) r.ToTag = batch(toTag, &ts, ToTagBatchSize) - for index := range messages { - msg := messages[index] + for index := range splitData { + d := splitData[index] + alerts := d.Alerts + msg := d.Message group.Add(func(stopCh chan interface{}) { - stopCh <- send(r, msg) + err := send(r, msg) + if err == nil { + if n.sentSuccessfulHandler != nil { + (*n.sentSuccessfulHandler)(alerts) + } + } + stopCh <- err }) } } @@ -409,19 +422,27 @@ func (n *Notifier) sendToChatBot(ctx context.Context, data *template.Data) error } else { msgSize = ChatbotMessageMaxTextSize } - messages, _, err := n.tmpl.Split(data, msgSize, n.receiver.TmplName, n.receiver.TitleTmplName, n.logger) + splitData, err := n.tmpl.Split(data, msgSize, n.receiver.TmplName, "", n.logger) if err != nil { _ = level.Error(n.logger).Log("msg", "wechatBotNotifier: split message error", "error", err.Error()) return err } group := async.NewGroup(ctx) - for index := range messages { - msg := fmt.Sprintf("%s", messages[index]) + for index := range splitData { + d := splitData[index] + alerts := d.Alerts + msg := d.Message if n.receiver.TmplType == constants.Markdown { msg = fmt.Sprintf("%s\n%s", msg, atUsers) } group.Add(func(stopCh chan interface{}) { - stopCh <- send(msg) + err := send(msg) + if err == nil { + if n.sentSuccessfulHandler != nil { + (*n.sentSuccessfulHandler)(alerts) + } + } + stopCh <- err }) } diff --git a/pkg/notify/notify.go b/pkg/notify/notify.go index fb945be3..44400ee0 100644 --- a/pkg/notify/notify.go +++ b/pkg/notify/notify.go @@ -2,6 +2,7 @@ package notify import ( "context" + "sync" "github.com/go-kit/kit/log" "github.com/go-kit/kit/log/level" @@ -69,11 +70,30 @@ func (s *notifyStage) Exec(ctx context.Context, l log.Logger, data interface{}) _ = level.Debug(l).Log("msg", "Start notify stage", "seq", ctx.Value("seq")) - group := async.NewGroup(ctx) + input := data.(map[internal.Receiver][]*template.Data) + alertMap := make(map[string]*template.Alert) + for _, dataList := range input { + for _, d := range dataList { + for _, alert := range d.Alerts { + alertMap[alert.ID] = alert + } + } + } + + var mutex sync.Mutex + handler := func(alerts []*template.Alert) { + mutex.Lock() + defer mutex.Unlock() - alertMap := data.(map[internal.Receiver][]*template.Data) + for _, alert := range alerts { + if a := alertMap[alert.ID]; a != nil { + a.NotifySuccessful = true + } + } + } - for k, v := range alertMap { + group := async.NewGroup(ctx) + for k, v := range input { receiver := k ds := v nf, err := factories[receiver.GetType()](l, receiver, s.notifierCtl) @@ -84,6 +104,7 @@ func (s *notifyStage) Exec(ctx context.Context, l log.Logger, data interface{}) }) continue } + nf.SetSentSuccessfulHandler(&handler) for _, d := range ds { alert := d @@ -91,8 +112,7 @@ func (s *notifyStage) Exec(ctx context.Context, l log.Logger, data interface{}) stopCh <- nf.Notify(ctx, alert) }) } - } - return ctx, data, group.Wait() + return ctx, alertMap, group.Wait() } diff --git a/pkg/route/router.go b/pkg/route/router.go index 4390e6a8..1de9f3db 100644 --- a/pkg/route/router.go +++ b/pkg/route/router.go @@ -41,9 +41,9 @@ func (s *routeStage) Exec(ctx context.Context, l log.Logger, data interface{}) ( return ctx, nil, nil } - alertArray := data.([]*template.Alert) + input := data.([]*template.Alert) - _ = level.Debug(l).Log("msg", "RouteStage: start", "seq", ctx.Value("seq"), "alert", len(alertArray)) + _ = level.Debug(l).Log("msg", "RouteStage: start", "seq", ctx.Value("seq"), "alert", len(input)) routers, err := s.notifierCtl.GetActiveRouters(ctx) if err != nil { @@ -53,7 +53,7 @@ func (s *routeStage) Exec(ctx context.Context, l log.Logger, data interface{}) ( // Grouping alerts by namespace alertMap := make(map[string][]*template.Alert) - for _, alert := range alertArray { + for _, alert := range input { ns := alert.Labels[constants.Namespace] as := alertMap[ns] as = append(as, alert) @@ -94,12 +94,12 @@ func (s *routeStage) Exec(ctx context.Context, l log.Logger, data interface{}) ( return ctx, nil, nil } - res := make(map[internal.Receiver][]*template.Alert) + output := make(map[internal.Receiver][]*template.Alert) for _, p := range m { - res[p.receiver] = p.alerts + output[p.receiver] = p.alerts } - return ctx, res, nil + return ctx, output, nil } func (s *routeStage) rcvsFromRouter(alert *template.Alert, routers []v2beta2.Router) []internal.Receiver { diff --git a/pkg/silence/silence.go b/pkg/silence/silence.go index 3e3e1be0..6883e4a0 100644 --- a/pkg/silence/silence.go +++ b/pkg/silence/silence.go @@ -23,14 +23,13 @@ func NewStage(notifierCtl *controller.Controller) stage.Stage { } func (s *silenceStage) Exec(ctx context.Context, l log.Logger, data interface{}) (context.Context, interface{}, error) { - if reflect2.IsNil(data) { return ctx, nil, nil } - alerts := data.([]*template.Alert) + input := data.([]*template.Alert) - _ = level.Debug(l).Log("msg", "Start silence stage", "seq", ctx.Value("seq"), "alert", len(alerts)) + _ = level.Debug(l).Log("msg", "Start silence stage", "seq", ctx.Value("seq"), "alert", len(input)) ss, err := s.notifierCtl.GetActiveSilences(ctx, "") if err != nil { @@ -39,11 +38,11 @@ func (s *silenceStage) Exec(ctx context.Context, l log.Logger, data interface{}) } if len(ss) == 0 { - return ctx, alerts, nil + return ctx, input, nil } - var as []*template.Alert - for _, alert := range alerts { + var output []*template.Alert + for _, alert := range input { mute := false for _, silence := range ss { if utils.LabelMatchSelector(alert.Labels, silence.Spec.Matcher) { @@ -53,9 +52,9 @@ func (s *silenceStage) Exec(ctx context.Context, l log.Logger, data interface{}) } if !mute { - as = append(as, alert) + output = append(output, alert) } } - return ctx, as, nil + return ctx, output, nil } diff --git a/pkg/stage/interface.go b/pkg/stage/stage.go similarity index 96% rename from pkg/stage/interface.go rename to pkg/stage/stage.go index 9fe44d3c..590cc597 100644 --- a/pkg/stage/interface.go +++ b/pkg/stage/stage.go @@ -25,7 +25,7 @@ func (ms MultiStage) Exec(ctx context.Context, l log.Logger, data interface{}) ( ctx, data, err = s.Exec(ctx, l, data) if err != nil { - return ctx, nil, err + return ctx, data, err } } return ctx, data, nil diff --git a/pkg/template/template.go b/pkg/template/template.go index aad0f040..fe5a6a29 100644 --- a/pkg/template/template.go +++ b/pkg/template/template.go @@ -229,61 +229,74 @@ func (t *Template) transform(name string) string { return fmt.Sprintf("{{ template \"%s\" . }}", name) } -func (t *Template) Split(data *Data, maxSize int, templateName string, subjectTemplateName string, l log.Logger) ([]string, []string, error) { +type DataSlice struct { + *Data + Message string + Title string +} - var messages []string - var subjects []string +func (t *Template) Split(data *Data, maxSize int, templateName string, subjectTemplateName string, l log.Logger) ([]*DataSlice, error) { + var output []*DataSlice lastMsg := "" - lastSubject := "" - d := &Data{ - GroupLabels: data.GroupLabels, - } + lastTitle := "" + var d *Data for i := 0; i < len(data.Alerts); i++ { + if d == nil { + d = &Data{ + GroupLabels: data.GroupLabels, + } + } d.Alerts = append(d.Alerts, data.Alerts[i]) msg, err := t.Text(t.transform(templateName), d.Format()) if err != nil { - return nil, nil, err + return nil, err } - subject := "" + title := "" if subjectTemplateName != "" { - subject, err = t.Text(t.transform(subjectTemplateName), d) + title, err = t.Text(t.transform(subjectTemplateName), d) if err != nil { - return nil, nil, err + return nil, err } } if Len(msg) < maxSize { lastMsg = msg - lastSubject = subject + lastTitle = title continue } // If there is only alert, and the message length is greater than MaxMessageSize, drop this alert. if len(d.Alerts) == 1 { _ = level.Error(l).Log("msg", "alert is too large, drop it") - d.Alerts = nil + d = nil lastMsg = "" - lastSubject = "" + lastTitle = "" continue } - messages = append(messages, lastMsg) - subjects = append(subjects, subject) + output = append(output, &DataSlice{ + Data: d, + Message: lastMsg, + Title: lastTitle, + }) - d.Alerts = nil + d = nil i = i - 1 lastMsg = "" - lastSubject = "" + lastTitle = "" } if len(lastMsg) > 0 { - messages = append(messages, lastMsg) - subjects = append(subjects, lastSubject) + output = append(output, &DataSlice{ + Data: d, + Message: lastMsg, + Title: lastTitle, + }) } - return messages, subjects, nil + return output, nil } // Len return the length of string after serialized. diff --git a/pkg/template/types.go b/pkg/template/types.go index dc865a04..5a399188 100644 --- a/pkg/template/types.go +++ b/pkg/template/types.go @@ -61,10 +61,13 @@ func (d *Data) Format() *Data { } func (d *Data) Status() string { - if len(d.Alerts.Firing()) > 0 { + if len(d.Alerts.Firing()) == len(d.Alerts) { return constants.AlertFiring + } else if len(d.Alerts.Resolved()) == len(d.Alerts) { + return constants.AlertResolved + } else { + return "" } - return constants.AlertResolved } // Pair is a key/value string pair. @@ -157,12 +160,15 @@ func (kv KV) Clone() KV { } type Alert struct { + ID string `json:"id"` Status string `json:"status"` Labels KV `json:"labels"` Annotations KV `json:"annotations"` StartsAt time.Time `json:"startsAt,omitempty"` EndsAt time.Time `json:"endsAt,omitempty"` + + NotifySuccessful bool } func (a *Alert) Message() string { diff --git a/pkg/webhook/v1/handler.go b/pkg/webhook/v1/handler.go index 0bd91f03..f4bf7872 100644 --- a/pkg/webhook/v1/handler.go +++ b/pkg/webhook/v1/handler.go @@ -66,6 +66,7 @@ func (h *HttpHandler) Alert(w http.ResponseWriter, r *http.Request) { if v := alert.Labels["cluster"]; v == "" { alert.Labels["cluster"] = cluster } + alert.ID = utils.Hash(alert) if err := h.alerts.Push(alert); err != nil { _ = level.Error(h.logger).Log("msg", "push alert error", "error", err.Error()) }