Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Optimize the logic of notification history #193

Merged
merged 1 commit into from
Feb 7, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion config/samples/bundle.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ data:
{{ define "nm.default.message" }}{{ if ne (len .Status) 0 }}[{{ .Status | translate }}] {{ end }}{{ . | message }}{{ end }}
{{ define "nm.default.message.cn" }}{{ if ne (len .Status) 0 }}[{{ .Status | translate }}] {{ end }}{{ .MessageCN }}{{ end }}

{{ define "nm.default.subject" }}{{ if eq (len .Alerts) 1 }}{{ range .Alerts }}{{ template "nm.default.message" . }}{{ end }}{{ else }}{{ .Alerts | len }} {{ .Status }} alerts{{ if gt (len .GroupLabels.SortedPairs) 1 }} for {{ range .GroupLabels.SortedPairs }}{{ .Name | translate }}={{ .Value }} {{ end }}{{ end }}{{ end }}{{ end }}
{{ define "nm.default.subject" }}{{ if eq (len .Alerts) 1 }}{{ range .Alerts }}{{ template "nm.default.message" . }}{{ end }}{{ else }}{{ .Alerts | len }} {{ if ne (len .Status) 0 }}{{ .Status }} {{ end }}alerts{{ if gt (len .GroupLabels.SortedPairs) 1 }} for {{ range .GroupLabels.SortedPairs }}{{ .Name | translate }}={{ .Value }} {{ end }}{{ end }}{{ end }}{{ end }}

{{ define "nm.default.text" }}{{ range .Alerts }}{{ template "nm.default.message" . }}
{{ range .Labels.SortedPairs }} {{ .Name | translate }}: {{ .Value }}
Expand Down
2 changes: 1 addition & 1 deletion config/samples/template.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ data:
{{ define "nm.default.message" }}{{ if ne (len .Status) 0 }}[{{ .Status | translate }}] {{ end }}{{ . | message }}{{ end }}
{{ define "nm.default.message.cn" }}{{ if ne (len .Status) 0 }}[{{ .Status | translate }}] {{ end }}{{ .MessageCN }}{{ end }}

{{ define "nm.default.subject" }}{{ if eq (len .Alerts) 1 }}{{ range .Alerts }}{{ template "nm.default.message" . }}{{ end }}{{ else }}{{ .Alerts | len }} {{ .Status }} alerts{{ if gt (len .GroupLabels.SortedPairs) 1 }} for {{ range .GroupLabels.SortedPairs }}{{ .Name | translate }}={{ .Value }} {{ end }}{{ end }}{{ end }}{{ end }}
{{ define "nm.default.subject" }}{{ if eq (len .Alerts) 1 }}{{ range .Alerts }}{{ template "nm.default.message" . }}{{ end }}{{ else }}{{ .Alerts | len }} {{ if ne (len .Status) 0 }}{{ .Status }} {{ end }}alerts{{ if gt (len .GroupLabels.SortedPairs) 1 }} for {{ range .GroupLabels.SortedPairs }}{{ .Name | translate }}={{ .Value }} {{ end }}{{ end }}{{ end }}{{ end }}

{{ define "nm.default.text" }}{{ range .Alerts }}{{ template "nm.default.message" . }}
{{ range .Labels.SortedPairs }} {{ .Name | translate }}: {{ .Value }}
Expand Down
11 changes: 6 additions & 5 deletions helm/templates/post-install.yaml → helm/templates/hooks.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ metadata:
annotations:
"helm.sh/hook": post-install
"helm.sh/hook-weight": "1"
"helm.sh/hook-delete-policy": hook-succeeded,hook-failed
"helm.sh/hook-delete-policy": hook-succeeded,hook-failed,before-hook-creation

---
apiVersion: rbac.authorization.k8s.io/v1
Expand All @@ -15,7 +15,7 @@ metadata:
annotations:
"helm.sh/hook": post-install
"helm.sh/hook-weight": "2"
"helm.sh/hook-delete-policy": hook-succeeded,hook-failed
"helm.sh/hook-delete-policy": hook-succeeded,hook-failed,before-hook-creation
rules:
- apiGroups:
- apiextensions.k8s.io
Expand All @@ -38,7 +38,7 @@ metadata:
annotations:
"helm.sh/hook": post-install
"helm.sh/hook-weight": "3"
"helm.sh/hook-delete-policy": hook-succeeded,hook-failed
"helm.sh/hook-delete-policy": hook-succeeded,hook-failed,before-hook-creation
subjects:
- kind: ServiceAccount
name: {{ .Release.Name }}-post-install
Expand All @@ -56,7 +56,7 @@ metadata:
annotations:
"helm.sh/hook": post-install
"helm.sh/hook-weight": "4"
"helm.sh/hook-delete-policy": hook-succeeded,hook-failed
"helm.sh/hook-delete-policy": hook-succeeded,hook-failed,before-hook-creation
spec:
backoffLimit: {{ .Values.hook.postInstall.backoffLimit }}
template:
Expand All @@ -67,7 +67,8 @@ spec:
serviceAccountName: {{ .Release.Name }}-post-install
containers:
- name: post-install-job
image: "bitnami/kubectl:1.23.6"
image: {{ .Values.hook.postInstall.image.repo }}:{{ .Values.hook.postInstall.image.tag }}
imagePullPolicy: {{ .Values.hook.postInstall.image.pullPolicy }}
command:
- /bin/sh
- -c
Expand Down
2 changes: 1 addition & 1 deletion helm/templates/template.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ data:
{{ "{{ define \"nm.default.message\" }}{{ if ne (len .Status) 0 }}[{{ .Status | translate }}] {{ end }}{{ . | message }}{{ end }}" }}
{{ "{{ define \"nm.default.message.cn\" }}{{ if ne (len .Status) 0 }}[{{ .Status | translate }}] {{ end }}{{ .MessageCN }}{{ end }}" }}

{{ "{{ define \"nm.default.subject\" }}{{ if eq (len .Alerts) 1 }}{{ range .Alerts }}{{ template \"nm.default.message\" . }}{{ end }}{{ else }}{{ .Alerts | len }} {{ .Status }} alerts{{ if gt (len .GroupLabels.SortedPairs) 1 }} for {{ range .GroupLabels.SortedPairs }}{{ .Name | translate }}={{ .Value }} {{ end }}{{ end }}{{ end }}{{ end }}" }}
{{ "{{ define \"nm.default.subject\" }}{{ if eq (len .Alerts) 1 }}{{ range .Alerts }}{{ template \"nm.default.message\" . }}{{ end }}{{ else }}{{ .Alerts | len }} {{ if ne (len .Status) 0 }}{{ .Status }} {{ end }}alerts{{ if gt (len .GroupLabels.SortedPairs) 1 }} for {{ range .GroupLabels.SortedPairs }}{{ .Name | translate }}={{ .Value }} {{ end }}{{ end }}{{ end }}{{ end }}" }}

{{ "{{ define \"nm.default.text\" }}{{ range .Alerts }}{{ template \"nm.default.message\" . }}" }}
{{ "{{ range .Labels.SortedPairs }} {{ .Name | translate }}: {{ .Value }}" }}
Expand Down
4 changes: 4 additions & 0 deletions helm/values.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,10 @@ kubesphere: false

hook:
postInstall:
image:
repo: kubesphere/kubectl
tag: v1.22.0
pullPolicy: IfNotPresent
backoffLimit: 1

# Set timezone to be injected into containers
Expand Down
14 changes: 11 additions & 3 deletions pkg/dispatcher/dispatcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -122,12 +122,20 @@ func (d *Dispatcher) worker(ctx context.Context, data interface{}, stopCh chan s
pipeline = append(pipeline, aggregation.NewStage(d.notifierCtl))
// Notify stage
pipeline = append(pipeline, notify.NewStage(d.notifierCtl))
// History stage
pipeline = append(pipeline, history.NewStage(d.notifierCtl))

if _, _, err := pipeline.Exec(ctx, d.l, data); err != nil {
_, output, err := pipeline.Exec(ctx, d.l, data)
if err != nil {
_ = level.Error(d.l).Log("msg", "Dispatcher: process alerts failed", "seq", ctx.Value("seq"))
}

go d.execHistoryStage(ctx.Value("seq"), output)

stopCh <- struct{}{}
}

func (d *Dispatcher) execHistoryStage(seq, input interface{}) {
s := history.NewStage(d.notifierCtl)
if _, _, err := s.Exec(context.WithValue(context.Background(), "seq", seq), d.l, input); err != nil {
_ = level.Error(d.l).Log("msg", "Dispatcher: exec history stage failed", "seq", seq)
}
}
22 changes: 6 additions & 16 deletions pkg/history/history.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@ import (
"github.com/kubesphere/notification-manager/pkg/notify"
"github.com/kubesphere/notification-manager/pkg/stage"
"github.com/kubesphere/notification-manager/pkg/template"
"github.com/kubesphere/notification-manager/pkg/utils"
"github.com/modern-go/reflect2"
)

Expand All @@ -31,7 +30,6 @@ func NewStage(notifierCtl *controller.Controller) stage.Stage {
}

func (s *historyStage) Exec(ctx context.Context, l log.Logger, data interface{}) (context.Context, interface{}, error) {

if reflect2.IsNil(data) {
return ctx, nil, nil
}
Expand All @@ -43,27 +41,19 @@ func (s *historyStage) Exec(ctx context.Context, l log.Logger, data interface{})

_ = level.Debug(l).Log("msg", "Start history stage", "seq", ctx.Value("seq"))

alertMap := data.(map[internal.Receiver][]*template.Data)
m := make(map[string]*template.Alert)
for _, v := range alertMap {
for _, d := range v {
for _, alert := range d.Alerts {
hash := utils.Hash(alert)
m[hash] = alert
}
}
}

input := data.(map[string]*template.Alert)
d := &template.Data{}
for _, v := range m {
d.Alerts = append(d.Alerts, v)
for _, alert := range input {
if alert.NotifySuccessful {
d.Alerts = append(d.Alerts, alert)
}
}

if len(d.Alerts) == 0 {
return ctx, nil, nil
}

alertMap = make(map[internal.Receiver][]*template.Data)
alertMap := make(map[internal.Receiver][]*template.Data)
for _, receiver := range receivers {
alertMap[receiver] = []*template.Data{d}
}
Expand Down
50 changes: 36 additions & 14 deletions pkg/notify/notifier/dingtalk/dingtalk.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,8 @@ type Notifier struct {
conversationThreshold int
conversationUnit time.Duration
conversationMaxWaitTime time.Duration

sentSuccessfulHandler *func([]*template.Alert)
}

type dingtalkText struct {
Expand Down Expand Up @@ -216,6 +218,10 @@ func NewDingTalkNotifier(logger log.Logger, receiver internal.Receiver, notifier
return n, nil
}

func (n *Notifier) SetSentSuccessfulHandler(h *func([]*template.Alert)) {
n.sentSuccessfulHandler = h
}

func (n *Notifier) Notify(ctx context.Context, data *template.Data) error {

group := async.NewGroup(ctx)
Expand Down Expand Up @@ -357,27 +363,35 @@ func (n *Notifier) sendToChatBot(ctx context.Context, data *template.Data) error
maxSize = maxSize - len(atMobiles)
}

messages, titles, err := n.tmpl.Split(data, n.chatbotMessageMaxSize-len(keywords)-len(atMobiles), n.receiver.TmplName, n.receiver.TitleTmplName, n.logger)
splitData, err := n.tmpl.Split(data, n.chatbotMessageMaxSize-len(keywords)-len(atMobiles), n.receiver.TmplName, n.receiver.TitleTmplName, n.logger)
if err != nil {
_ = level.Error(n.logger).Log("msg", "DingTalkNotifier: split message error", "error", err.Error())
return err
}

group := async.NewGroup(ctx)
for index := range messages {
title := titles[index]
msg := fmt.Sprintf("%s%s", messages[index], keywords)
for index := range splitData {
d := splitData[index]
alerts := d.Alerts
title := d.Title
msg := fmt.Sprintf("%s%s", d.Message, keywords)
if n.receiver.TmplType == constants.Markdown {
msg = fmt.Sprintf("%s %s", msg, atMobiles)
}
group.Add(func(stopCh chan interface{}) {
n.throttle.TryAdd(webhook, n.chatbotThreshold, n.chatbotUnit, n.chatbotMaxWaitTime)
if n.throttle.Allow(webhook, n.logger) {
stopCh <- send(title, msg)
} else {
if !n.throttle.Allow(webhook, n.logger) {
_ = level.Error(n.logger).Log("msg", "DingTalkNotifier: message to chatbot dropped because of flow control")
stopCh <- utils.Error("")
}

err := send(title, msg)
if err == nil {
if n.sentSuccessfulHandler != nil {
(*n.sentSuccessfulHandler)(alerts)
}
}
stopCh <- err
})
}

Expand Down Expand Up @@ -486,26 +500,34 @@ func (n *Notifier) sendToConversation(ctx context.Context, data *template.Data)
return nil
}

messages, titles, err := n.tmpl.Split(data, n.conversationMessageMaxSize, n.receiver.TmplName, n.receiver.TitleTmplName, n.logger)
splitData, err := n.tmpl.Split(data, n.conversationMessageMaxSize, n.receiver.TmplName, n.receiver.TitleTmplName, n.logger)
if err != nil {
_ = level.Error(n.logger).Log("msg", "DingTalkNotifier: split message error", "error", err.Error())
return nil
}

group := async.NewGroup(ctx)
for index := range messages {
title := titles[index]
msg := messages[index]
for index := range splitData {
d := splitData[index]
alerts := d.Alerts
title := d.Title
msg := d.Message
for _, chatID := range n.receiver.ChatIDs {
id := chatID
group.Add(func(stopCh chan interface{}) {
n.throttle.TryAdd(appkey, n.conversationThreshold, n.conversationUnit, n.conversationMaxWaitTime)
if n.throttle.Allow(appkey, n.logger) {
stopCh <- send(id, title, msg)
} else {
if !n.throttle.Allow(appkey, n.logger) {
_ = level.Error(n.logger).Log("msg", "DingTalkNotifier: message to conversation dropped because of flow control", "conversation", chatID)
stopCh <- utils.Error("")
}

err := send(id, title, msg)
if err == nil {
if n.sentSuccessfulHandler != nil {
(*n.sentSuccessfulHandler)(alerts)
}
}
stopCh <- err
})
}
}
Expand Down
25 changes: 20 additions & 5 deletions pkg/notify/notifier/discord/discord.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,8 @@ type Notifier struct {
timeout time.Duration
logger log.Logger
tmpl *template.Template

sentSuccessfulHandler *func([]*template.Alert)
}
type Message struct {
Content string `json:"content"`
Expand Down Expand Up @@ -89,6 +91,10 @@ func NewDiscordNotifier(logger log.Logger, receiver internal.Receiver, notifierC
return n, nil
}

func (n *Notifier) SetSentSuccessfulHandler(h *func([]*template.Alert)) {
n.sentSuccessfulHandler = h
}

func (n *Notifier) Notify(ctx context.Context, data *template.Data) error {

mentionedUsers := n.receiver.MentionedUsers
Expand All @@ -113,18 +119,27 @@ func (n *Notifier) Notify(ctx context.Context, data *template.Data) error {
} else {
length = EmbedLimit - len(atUsers) - len(atRoles)
}
messages, _, err := n.tmpl.Split(data, length, n.receiver.TmplName, "", n.logger)
splitData, err := n.tmpl.Split(data, length, n.receiver.TmplName, "", n.logger)
if err != nil {
_ = level.Error(n.logger).Log("msg", "DiscordNotifier: generate message error", "error", err.Error())
return err
}

group := async.NewGroup(ctx)
if n.receiver.Webhook != nil {
for index := range messages {
for index := range splitData {
d := splitData[index]
alerts := d.Alerts
msg := d.Message
group.Add(func(stopCh chan interface{}) {
msg := fmt.Sprintf("%s\n%s%s", messages[index], atUsers, atRoles)
stopCh <- n.sendTo(ctx, msg)
msg := fmt.Sprintf("%s\n%s%s", msg, atUsers, atRoles)
err := n.sendTo(ctx, msg)
if err == nil {
if n.sentSuccessfulHandler != nil {
(*n.sentSuccessfulHandler)(alerts)
}
}
stopCh <- err
})
}
}
Expand Down Expand Up @@ -165,7 +180,7 @@ func (n *Notifier) sendTo(ctx context.Context, content string) error {
return true, err
}
code := resp.StatusCode
level.Debug(n.logger).Log("msg", "DiscordNotifier", "response code:", code)
_ = level.Debug(n.logger).Log("msg", "DiscordNotifier", "response code:", code)

if code != http.StatusNoContent {
return false, fmt.Errorf("DiscordNotifier: send message error, code: %d", code)
Expand Down
15 changes: 14 additions & 1 deletion pkg/notify/notifier/email/email.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,8 @@ type Notifier struct {
delivery string
// The maximum size of receivers in one email.
maxEmailReceivers int

sentSuccessfulHandler *func([]*template.Alert)
}

func NewEmailNotifier(logger log.Logger, receiver internal.Receiver, notifierCtl *controller.Controller) (notifier.Notifier, error) {
Expand Down Expand Up @@ -126,6 +128,10 @@ func NewEmailNotifier(logger log.Logger, receiver internal.Receiver, notifierCtl
return n, nil
}

func (n *Notifier) SetSentSuccessfulHandler(h *func([]*template.Alert)) {
n.sentSuccessfulHandler = h
}

func (n *Notifier) Notify(ctx context.Context, data *template.Data) error {

sendEmail := func(to, subject, body string) error {
Expand Down Expand Up @@ -165,7 +171,14 @@ func (n *Notifier) Notify(ctx context.Context, data *template.Data) error {
for _, t := range n.receiver.To {
to := t
group.Add(func(stopCh chan interface{}) {
stopCh <- sendEmail(to, subject, body)
err := sendEmail(to, subject, body)
if err == nil {
if n.sentSuccessfulHandler != nil {
(*n.sentSuccessfulHandler)(data.Alerts)
}
}
stopCh <- err

})
}

Expand Down
Loading