Skip to content

Commit

Permalink
Merge pull request #193 from wanjunlei/master
Browse files Browse the repository at this point in the history
Optimize the logic of notification history
  • Loading branch information
benjaminhuo committed Feb 7, 2023
2 parents 2540744 + b7f26d5 commit 6955b97
Show file tree
Hide file tree
Showing 24 changed files with 285 additions and 109 deletions.
2 changes: 1 addition & 1 deletion config/samples/bundle.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ data:
{{ define "nm.default.message" }}{{ if ne (len .Status) 0 }}[{{ .Status | translate }}] {{ end }}{{ . | message }}{{ end }}
{{ define "nm.default.message.cn" }}{{ if ne (len .Status) 0 }}[{{ .Status | translate }}] {{ end }}{{ .MessageCN }}{{ end }}
{{ define "nm.default.subject" }}{{ if eq (len .Alerts) 1 }}{{ range .Alerts }}{{ template "nm.default.message" . }}{{ end }}{{ else }}{{ .Alerts | len }} {{ .Status }} alerts{{ if gt (len .GroupLabels.SortedPairs) 1 }} for {{ range .GroupLabels.SortedPairs }}{{ .Name | translate }}={{ .Value }} {{ end }}{{ end }}{{ end }}{{ end }}
{{ define "nm.default.subject" }}{{ if eq (len .Alerts) 1 }}{{ range .Alerts }}{{ template "nm.default.message" . }}{{ end }}{{ else }}{{ .Alerts | len }} {{ if ne (len .Status) 0 }}{{ .Status }} {{ end }}alerts{{ if gt (len .GroupLabels.SortedPairs) 1 }} for {{ range .GroupLabels.SortedPairs }}{{ .Name | translate }}={{ .Value }} {{ end }}{{ end }}{{ end }}{{ end }}
{{ define "nm.default.text" }}{{ range .Alerts }}{{ template "nm.default.message" . }}
{{ range .Labels.SortedPairs }} {{ .Name | translate }}: {{ .Value }}
Expand Down
2 changes: 1 addition & 1 deletion config/samples/template.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ data:
{{ define "nm.default.message" }}{{ if ne (len .Status) 0 }}[{{ .Status | translate }}] {{ end }}{{ . | message }}{{ end }}
{{ define "nm.default.message.cn" }}{{ if ne (len .Status) 0 }}[{{ .Status | translate }}] {{ end }}{{ .MessageCN }}{{ end }}
{{ define "nm.default.subject" }}{{ if eq (len .Alerts) 1 }}{{ range .Alerts }}{{ template "nm.default.message" . }}{{ end }}{{ else }}{{ .Alerts | len }} {{ .Status }} alerts{{ if gt (len .GroupLabels.SortedPairs) 1 }} for {{ range .GroupLabels.SortedPairs }}{{ .Name | translate }}={{ .Value }} {{ end }}{{ end }}{{ end }}{{ end }}
{{ define "nm.default.subject" }}{{ if eq (len .Alerts) 1 }}{{ range .Alerts }}{{ template "nm.default.message" . }}{{ end }}{{ else }}{{ .Alerts | len }} {{ if ne (len .Status) 0 }}{{ .Status }} {{ end }}alerts{{ if gt (len .GroupLabels.SortedPairs) 1 }} for {{ range .GroupLabels.SortedPairs }}{{ .Name | translate }}={{ .Value }} {{ end }}{{ end }}{{ end }}{{ end }}
{{ define "nm.default.text" }}{{ range .Alerts }}{{ template "nm.default.message" . }}
{{ range .Labels.SortedPairs }} {{ .Name | translate }}: {{ .Value }}
Expand Down
11 changes: 6 additions & 5 deletions helm/templates/post-install.yaml → helm/templates/hooks.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ metadata:
annotations:
"helm.sh/hook": post-install
"helm.sh/hook-weight": "1"
"helm.sh/hook-delete-policy": hook-succeeded,hook-failed
"helm.sh/hook-delete-policy": hook-succeeded,hook-failed,before-hook-creation

---
apiVersion: rbac.authorization.k8s.io/v1
Expand All @@ -15,7 +15,7 @@ metadata:
annotations:
"helm.sh/hook": post-install
"helm.sh/hook-weight": "2"
"helm.sh/hook-delete-policy": hook-succeeded,hook-failed
"helm.sh/hook-delete-policy": hook-succeeded,hook-failed,before-hook-creation
rules:
- apiGroups:
- apiextensions.k8s.io
Expand All @@ -38,7 +38,7 @@ metadata:
annotations:
"helm.sh/hook": post-install
"helm.sh/hook-weight": "3"
"helm.sh/hook-delete-policy": hook-succeeded,hook-failed
"helm.sh/hook-delete-policy": hook-succeeded,hook-failed,before-hook-creation
subjects:
- kind: ServiceAccount
name: {{ .Release.Name }}-post-install
Expand All @@ -56,7 +56,7 @@ metadata:
annotations:
"helm.sh/hook": post-install
"helm.sh/hook-weight": "4"
"helm.sh/hook-delete-policy": hook-succeeded,hook-failed
"helm.sh/hook-delete-policy": hook-succeeded,hook-failed,before-hook-creation
spec:
backoffLimit: {{ .Values.hook.postInstall.backoffLimit }}
template:
Expand All @@ -67,7 +67,8 @@ spec:
serviceAccountName: {{ .Release.Name }}-post-install
containers:
- name: post-install-job
image: "bitnami/kubectl:1.23.6"
image: {{ .Values.hook.postInstall.image.repo }}:{{ .Values.hook.postInstall.image.tag }}
imagePullPolicy: {{ .Values.hook.postInstall.image.pullPolicy }}
command:
- /bin/sh
- -c
Expand Down
2 changes: 1 addition & 1 deletion helm/templates/template.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ data:
{{ "{{ define \"nm.default.message\" }}{{ if ne (len .Status) 0 }}[{{ .Status | translate }}] {{ end }}{{ . | message }}{{ end }}" }}
{{ "{{ define \"nm.default.message.cn\" }}{{ if ne (len .Status) 0 }}[{{ .Status | translate }}] {{ end }}{{ .MessageCN }}{{ end }}" }}
{{ "{{ define \"nm.default.subject\" }}{{ if eq (len .Alerts) 1 }}{{ range .Alerts }}{{ template \"nm.default.message\" . }}{{ end }}{{ else }}{{ .Alerts | len }} {{ .Status }} alerts{{ if gt (len .GroupLabels.SortedPairs) 1 }} for {{ range .GroupLabels.SortedPairs }}{{ .Name | translate }}={{ .Value }} {{ end }}{{ end }}{{ end }}{{ end }}" }}
{{ "{{ define \"nm.default.subject\" }}{{ if eq (len .Alerts) 1 }}{{ range .Alerts }}{{ template \"nm.default.message\" . }}{{ end }}{{ else }}{{ .Alerts | len }} {{ if ne (len .Status) 0 }}{{ .Status }} {{ end }}alerts{{ if gt (len .GroupLabels.SortedPairs) 1 }} for {{ range .GroupLabels.SortedPairs }}{{ .Name | translate }}={{ .Value }} {{ end }}{{ end }}{{ end }}{{ end }}" }}
{{ "{{ define \"nm.default.text\" }}{{ range .Alerts }}{{ template \"nm.default.message\" . }}" }}
{{ "{{ range .Labels.SortedPairs }} {{ .Name | translate }}: {{ .Value }}" }}
Expand Down
4 changes: 4 additions & 0 deletions helm/values.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,10 @@ kubesphere: false

hook:
postInstall:
image:
repo: kubesphere/kubectl
tag: v1.22.0
pullPolicy: IfNotPresent
backoffLimit: 1

# Set timezone to be injected into containers
Expand Down
14 changes: 11 additions & 3 deletions pkg/dispatcher/dispatcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -122,12 +122,20 @@ func (d *Dispatcher) worker(ctx context.Context, data interface{}, stopCh chan s
pipeline = append(pipeline, aggregation.NewStage(d.notifierCtl))
// Notify stage
pipeline = append(pipeline, notify.NewStage(d.notifierCtl))
// History stage
pipeline = append(pipeline, history.NewStage(d.notifierCtl))

if _, _, err := pipeline.Exec(ctx, d.l, data); err != nil {
_, output, err := pipeline.Exec(ctx, d.l, data)
if err != nil {
_ = level.Error(d.l).Log("msg", "Dispatcher: process alerts failed", "seq", ctx.Value("seq"))
}

go d.execHistoryStage(ctx.Value("seq"), output)

stopCh <- struct{}{}
}

func (d *Dispatcher) execHistoryStage(seq, input interface{}) {
s := history.NewStage(d.notifierCtl)
if _, _, err := s.Exec(context.WithValue(context.Background(), "seq", seq), d.l, input); err != nil {
_ = level.Error(d.l).Log("msg", "Dispatcher: exec history stage failed", "seq", seq)
}
}
22 changes: 6 additions & 16 deletions pkg/history/history.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@ import (
"github.com/kubesphere/notification-manager/pkg/notify"
"github.com/kubesphere/notification-manager/pkg/stage"
"github.com/kubesphere/notification-manager/pkg/template"
"github.com/kubesphere/notification-manager/pkg/utils"
"github.com/modern-go/reflect2"
)

Expand All @@ -31,7 +30,6 @@ func NewStage(notifierCtl *controller.Controller) stage.Stage {
}

func (s *historyStage) Exec(ctx context.Context, l log.Logger, data interface{}) (context.Context, interface{}, error) {

if reflect2.IsNil(data) {
return ctx, nil, nil
}
Expand All @@ -43,27 +41,19 @@ func (s *historyStage) Exec(ctx context.Context, l log.Logger, data interface{})

_ = level.Debug(l).Log("msg", "Start history stage", "seq", ctx.Value("seq"))

alertMap := data.(map[internal.Receiver][]*template.Data)
m := make(map[string]*template.Alert)
for _, v := range alertMap {
for _, d := range v {
for _, alert := range d.Alerts {
hash := utils.Hash(alert)
m[hash] = alert
}
}
}

input := data.(map[string]*template.Alert)
d := &template.Data{}
for _, v := range m {
d.Alerts = append(d.Alerts, v)
for _, alert := range input {
if alert.NotifySuccessful {
d.Alerts = append(d.Alerts, alert)
}
}

if len(d.Alerts) == 0 {
return ctx, nil, nil
}

alertMap = make(map[internal.Receiver][]*template.Data)
alertMap := make(map[internal.Receiver][]*template.Data)
for _, receiver := range receivers {
alertMap[receiver] = []*template.Data{d}
}
Expand Down
50 changes: 36 additions & 14 deletions pkg/notify/notifier/dingtalk/dingtalk.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,8 @@ type Notifier struct {
conversationThreshold int
conversationUnit time.Duration
conversationMaxWaitTime time.Duration

sentSuccessfulHandler *func([]*template.Alert)
}

type dingtalkText struct {
Expand Down Expand Up @@ -216,6 +218,10 @@ func NewDingTalkNotifier(logger log.Logger, receiver internal.Receiver, notifier
return n, nil
}

func (n *Notifier) SetSentSuccessfulHandler(h *func([]*template.Alert)) {
n.sentSuccessfulHandler = h
}

func (n *Notifier) Notify(ctx context.Context, data *template.Data) error {

group := async.NewGroup(ctx)
Expand Down Expand Up @@ -357,27 +363,35 @@ func (n *Notifier) sendToChatBot(ctx context.Context, data *template.Data) error
maxSize = maxSize - len(atMobiles)
}

messages, titles, err := n.tmpl.Split(data, n.chatbotMessageMaxSize-len(keywords)-len(atMobiles), n.receiver.TmplName, n.receiver.TitleTmplName, n.logger)
splitData, err := n.tmpl.Split(data, n.chatbotMessageMaxSize-len(keywords)-len(atMobiles), n.receiver.TmplName, n.receiver.TitleTmplName, n.logger)
if err != nil {
_ = level.Error(n.logger).Log("msg", "DingTalkNotifier: split message error", "error", err.Error())
return err
}

group := async.NewGroup(ctx)
for index := range messages {
title := titles[index]
msg := fmt.Sprintf("%s%s", messages[index], keywords)
for index := range splitData {
d := splitData[index]
alerts := d.Alerts
title := d.Title
msg := fmt.Sprintf("%s%s", d.Message, keywords)
if n.receiver.TmplType == constants.Markdown {
msg = fmt.Sprintf("%s %s", msg, atMobiles)
}
group.Add(func(stopCh chan interface{}) {
n.throttle.TryAdd(webhook, n.chatbotThreshold, n.chatbotUnit, n.chatbotMaxWaitTime)
if n.throttle.Allow(webhook, n.logger) {
stopCh <- send(title, msg)
} else {
if !n.throttle.Allow(webhook, n.logger) {
_ = level.Error(n.logger).Log("msg", "DingTalkNotifier: message to chatbot dropped because of flow control")
stopCh <- utils.Error("")
}

err := send(title, msg)
if err == nil {
if n.sentSuccessfulHandler != nil {
(*n.sentSuccessfulHandler)(alerts)
}
}
stopCh <- err
})
}

Expand Down Expand Up @@ -486,26 +500,34 @@ func (n *Notifier) sendToConversation(ctx context.Context, data *template.Data)
return nil
}

messages, titles, err := n.tmpl.Split(data, n.conversationMessageMaxSize, n.receiver.TmplName, n.receiver.TitleTmplName, n.logger)
splitData, err := n.tmpl.Split(data, n.conversationMessageMaxSize, n.receiver.TmplName, n.receiver.TitleTmplName, n.logger)
if err != nil {
_ = level.Error(n.logger).Log("msg", "DingTalkNotifier: split message error", "error", err.Error())
return nil
}

group := async.NewGroup(ctx)
for index := range messages {
title := titles[index]
msg := messages[index]
for index := range splitData {
d := splitData[index]
alerts := d.Alerts
title := d.Title
msg := d.Message
for _, chatID := range n.receiver.ChatIDs {
id := chatID
group.Add(func(stopCh chan interface{}) {
n.throttle.TryAdd(appkey, n.conversationThreshold, n.conversationUnit, n.conversationMaxWaitTime)
if n.throttle.Allow(appkey, n.logger) {
stopCh <- send(id, title, msg)
} else {
if !n.throttle.Allow(appkey, n.logger) {
_ = level.Error(n.logger).Log("msg", "DingTalkNotifier: message to conversation dropped because of flow control", "conversation", chatID)
stopCh <- utils.Error("")
}

err := send(id, title, msg)
if err == nil {
if n.sentSuccessfulHandler != nil {
(*n.sentSuccessfulHandler)(alerts)
}
}
stopCh <- err
})
}
}
Expand Down
25 changes: 20 additions & 5 deletions pkg/notify/notifier/discord/discord.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,8 @@ type Notifier struct {
timeout time.Duration
logger log.Logger
tmpl *template.Template

sentSuccessfulHandler *func([]*template.Alert)
}
type Message struct {
Content string `json:"content"`
Expand Down Expand Up @@ -89,6 +91,10 @@ func NewDiscordNotifier(logger log.Logger, receiver internal.Receiver, notifierC
return n, nil
}

func (n *Notifier) SetSentSuccessfulHandler(h *func([]*template.Alert)) {
n.sentSuccessfulHandler = h
}

func (n *Notifier) Notify(ctx context.Context, data *template.Data) error {

mentionedUsers := n.receiver.MentionedUsers
Expand All @@ -113,18 +119,27 @@ func (n *Notifier) Notify(ctx context.Context, data *template.Data) error {
} else {
length = EmbedLimit - len(atUsers) - len(atRoles)
}
messages, _, err := n.tmpl.Split(data, length, n.receiver.TmplName, "", n.logger)
splitData, err := n.tmpl.Split(data, length, n.receiver.TmplName, "", n.logger)
if err != nil {
_ = level.Error(n.logger).Log("msg", "DiscordNotifier: generate message error", "error", err.Error())
return err
}

group := async.NewGroup(ctx)
if n.receiver.Webhook != nil {
for index := range messages {
for index := range splitData {
d := splitData[index]
alerts := d.Alerts
msg := d.Message
group.Add(func(stopCh chan interface{}) {
msg := fmt.Sprintf("%s\n%s%s", messages[index], atUsers, atRoles)
stopCh <- n.sendTo(ctx, msg)
msg := fmt.Sprintf("%s\n%s%s", msg, atUsers, atRoles)
err := n.sendTo(ctx, msg)
if err == nil {
if n.sentSuccessfulHandler != nil {
(*n.sentSuccessfulHandler)(alerts)
}
}
stopCh <- err
})
}
}
Expand Down Expand Up @@ -165,7 +180,7 @@ func (n *Notifier) sendTo(ctx context.Context, content string) error {
return true, err
}
code := resp.StatusCode
level.Debug(n.logger).Log("msg", "DiscordNotifier", "response code:", code)
_ = level.Debug(n.logger).Log("msg", "DiscordNotifier", "response code:", code)

if code != http.StatusNoContent {
return false, fmt.Errorf("DiscordNotifier: send message error, code: %d", code)
Expand Down
15 changes: 14 additions & 1 deletion pkg/notify/notifier/email/email.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,8 @@ type Notifier struct {
delivery string
// The maximum size of receivers in one email.
maxEmailReceivers int

sentSuccessfulHandler *func([]*template.Alert)
}

func NewEmailNotifier(logger log.Logger, receiver internal.Receiver, notifierCtl *controller.Controller) (notifier.Notifier, error) {
Expand Down Expand Up @@ -126,6 +128,10 @@ func NewEmailNotifier(logger log.Logger, receiver internal.Receiver, notifierCtl
return n, nil
}

func (n *Notifier) SetSentSuccessfulHandler(h *func([]*template.Alert)) {
n.sentSuccessfulHandler = h
}

func (n *Notifier) Notify(ctx context.Context, data *template.Data) error {

sendEmail := func(to, subject, body string) error {
Expand Down Expand Up @@ -165,7 +171,14 @@ func (n *Notifier) Notify(ctx context.Context, data *template.Data) error {
for _, t := range n.receiver.To {
to := t
group.Add(func(stopCh chan interface{}) {
stopCh <- sendEmail(to, subject, body)
err := sendEmail(to, subject, body)
if err == nil {
if n.sentSuccessfulHandler != nil {
(*n.sentSuccessfulHandler)(data.Alerts)
}
}
stopCh <- err

})
}

Expand Down
Loading

0 comments on commit 6955b97

Please sign in to comment.