Skip to content

Commit

Permalink
增加分组聚合支持,增加 Meta 字段排序函数,修复时间范围报警频率bug
Browse files Browse the repository at this point in the history
  • Loading branch information
mylxsw committed Aug 19, 2020
1 parent 9c15f23 commit c271df9
Show file tree
Hide file tree
Showing 18 changed files with 1,695 additions and 1,384 deletions.
10 changes: 10 additions & 0 deletions api/controller/rule.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,8 @@ type RuleForm struct {
Description string `json:"description"`
Tags []string `json:"tags"`

AggregateRule string `json:"aggregate_rule"`

ReadyType string `json:"ready_type"`
Interval int64 `json:"interval"`
DailyTimes []string `json:"daily_times"`
Expand Down Expand Up @@ -192,6 +194,10 @@ func (r RuleForm) Validate(req web.Request) error {
}
}

if _, err := matcher.NewMessageFinger(r.AggregateRule); err != nil {
return fmt.Errorf("group rule is invalid")
}

return nil
}

Expand All @@ -207,6 +213,8 @@ func (r RuleController) Check(ctx web.Context) web.Response {
_, err = matcher.NewTriggerMatcher(repository.Trigger{PreCondition: content})
case repository.TemplateTypeTemplate:
_, err = template.CreateParser(content)
case "aggregate_rule":
_, err = matcher.NewMessageFinger(content)
}

if err != nil {
Expand Down Expand Up @@ -257,6 +265,7 @@ func (r RuleController) Add(ctx web.Context, repo repository.RuleRepo, manager a
Interval: ruleForm.Interval,
TimeRanges: ruleForm.TimeRanges,
Rule: ruleForm.Rule,
AggregateRule: ruleForm.AggregateRule,
Template: ruleForm.Template,
SummaryTemplate: ruleForm.SummaryTemplate,
Triggers: triggers,
Expand Down Expand Up @@ -325,6 +334,7 @@ func (r RuleController) Update(ctx web.Context, ruleRepo repository.RuleRepo, ma
Interval: ruleForm.Interval,
TimeRanges: ruleForm.TimeRanges,
Rule: ruleForm.Rule,
AggregateRule: ruleForm.AggregateRule,
Template: ruleForm.Template,
SummaryTemplate: ruleForm.SummaryTemplate,
Triggers: triggers,
Expand Down
2,734 changes: 1,376 additions & 1,358 deletions api/static.go

Large diffs are not rendered by default.

6 changes: 3 additions & 3 deletions api/view/groups.html
Original file line number Diff line number Diff line change
Expand Up @@ -96,10 +96,10 @@ <h2>{{ .Group.UpdatedAt | datetime }} - {{ .Group.Rule.Name }}</h2>
<th>来源</th>
<td>{{ $msg.Origin }}</td>
</tr>
{{ range $i, $m := $msg.Meta }}
{{ range $i, $m := sort_map_human $msg.Meta }}
<tr class="adanos-can-fold">
<th>{{ $i }}</th>
<td><pre style="margin: 0; padding: 0; line-height: 1.5;">{{ format "%v" $m | remove_empty_line }}</pre></td>
<th>{{ $m.Key }}</th>
<td><pre style="margin: 0; padding: 0; line-height: 1.5;">{{ format "%v" $m.Value | remove_empty_line }}</pre></td>
</tr>
{{ end }}
<tr>
Expand Down
7 changes: 3 additions & 4 deletions api/view/template.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
package view


var defaultMessageViewTemplate = `<html>
<head>
<title>{{ .Group.Rule.Name }}</title>
Expand Down Expand Up @@ -99,10 +98,10 @@ var defaultMessageViewTemplate = `<html>
<th>来源</th>
<td>{{ $msg.Origin }}</td>
</tr>
{{ range $i, $m := $msg.Meta }}
{{ range $i, $m := sort_map_human $msg.Meta }}
<tr class="adanos-can-fold">
<th>{{ $i }}</th>
<td><pre style="margin: 0; padding: 0; line-height: 1.5;">{{ format "%v" $m | remove_empty_line }}</pre></td>
<th>{{ $m.Key }}</th>
<td><pre style="margin: 0; padding: 0; line-height: 1.5;">{{ format "%v" $m.Value | remove_empty_line }}</pre></td>
</tr>
{{ end }}
<tr>
Expand Down
1 change: 1 addition & 0 deletions dashboard/src/components/TemplateHelp.vue
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ Group: {
ID primitive.ObjectID
SeqNum int64
MessageCount int64
AggregateKey string
Rule {
ID primitive.ObjectID
Name string
Expand Down
1 change: 1 addition & 0 deletions dashboard/src/components/TriggerHelp.vue
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ Group {
ID primitive.ObjectID
SeqNum int64
MessageCount int64
AggregateKey string
Rule {
ID primitive.ObjectID
Name string
Expand Down
1 change: 1 addition & 0 deletions dashboard/src/views/Groups.vue
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
<b-link :to="'/rules/' + row.item.rule.id + '/edit'" target="_blank" class="ml-2">
<font-awesome-icon icon="external-link-alt"></font-awesome-icon>
</b-link>
<p v-if="row.item.aggregate_key !== ''"><b-badge v-b-tooltip.hover title="聚合条件(Key)">{{ row.item.aggregate_key }}</b-badge></p>
</template>
<template v-slot:cell(status)="row">
<b-badge v-if="row.item.status === 'collecting'" variant="dark" :title="'预计' + formatted(row.item.rule.expect_ready_at) + '完成'" v-b-tooltip.hover>收集中
Expand Down
26 changes: 26 additions & 0 deletions dashboard/src/views/RuleEdit.vue
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,16 @@
target="_blank">https://github.com/antonmedv/expr/blob/master/docs/Language-Definition.md</a>
</small>
<MatchRuleHelp v-if="rule_help" :helpers="helper.groupMatchRules"/>
<b-form-group class="mt-4" label-cols="2" label="聚合条件(可选)" label-for="aggregate_cond_input" description="聚合条件表达式语法与匹配规则一致,用于对符合匹配规则的一组消息按照某个可变值分组,类似于 SQL 中的 GroupBy">
<b-input-group>
<b-form-input id="aggregate_cond_input" placeholder="输入聚合条件"
v-model="form.aggregate_rule"/>

<b-input-group-append>
<b-btn variant="primary" @click="checkAggregateRule(form.aggregate_rule)">检查</b-btn>
</b-input-group-append>
</b-input-group>
</b-form-group>
</b-card>
</b-card-group>

Expand Down Expand Up @@ -374,6 +384,7 @@ let helpers = {
{text: "Group.ID", displayText: ""},
{text: "Group.SeqNum", displayText: ""},
{text: "Group.MessageCount", displayText: ""},
{text: "Group.AggregateKey", displayText: ""},
{text: "Group.Rule", displayText: ""},
{text: "Group.Rule.ID", displayText: ""},
{text: "Group.Rule.Name", displayText: ""},
Expand Down Expand Up @@ -601,6 +612,7 @@ export default {
name: '',
description: '',
tags: [],
aggregate_rule: '',
ready_type: 'interval',
daily_times: ['09:00:00'],
time_ranges: [
Expand Down Expand Up @@ -741,6 +753,18 @@ export default {
this.sendCheckRequest('template', template.trim());
},
/**
* 检查模板是否合法
*/
checkAggregateRule(content) {
if (content.trim() === '') {
this.ErrorBox('分组规则为空,无需检查');
return;
}
this.sendCheckRequest('aggregate_rule', content.trim());
},
/**
* 发送规则检查请求
*/
Expand Down Expand Up @@ -914,6 +938,7 @@ export default {
requestData.description = this.form.description;
requestData.rule = this.form.rule;
requestData.tags = this.form.tags;
requestData.aggregate_rule = this.form.aggregate_rule;
requestData.template = this.form.template;
requestData.triggers = this.form.triggers.map(function (value) {
value.meta = JSON.stringify(value.meta_arr);
Expand Down Expand Up @@ -958,6 +983,7 @@ export default {
this.form.daily_times = (response.data.daily_times === null || response.data.daily_times.length === 0) ? ['09:00:00'] : response.data.daily_times;
this.form.rule = response.data.rule;
this.form.tags = response.data.tags;
this.form.aggregate_rule = response.data.aggregate_rule;
this.form.template = response.data.template;
if (response.data.time_ranges === null || response.data.time_ranges.length === 0) {
Expand Down
32 changes: 26 additions & 6 deletions internal/job/aggregation.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@ import (
"github.com/mylxsw/coll"
"github.com/mylxsw/container"
"go.mongodb.org/mongo-driver/bson"
"go.mongodb.org/mongo-driver/bson/primitive"
)

const AggregationJobName = "aggregation"
Expand Down Expand Up @@ -39,7 +38,7 @@ func (a *AggregationJob) groupingMessages(msgRepo repository.MessageRepo, groupR
return err
}

collectingGroups := make(map[primitive.ObjectID]repository.MessageGroup)
collectingGroups := make(map[string]repository.MessageGroup)
err = msgRepo.Traverse(bson.M{"status": repository.MessageStatusPending}, func(msg repository.Message) error {
for _, m := range matchers {
matched, err := m.Match(msg)
Expand All @@ -49,8 +48,10 @@ func (a *AggregationJob) groupingMessages(msgRepo repository.MessageRepo, groupR

// if the message matched a rule, update message's group_id and skip to next message
if matched {
if _, ok := collectingGroups[m.Rule().ID]; !ok {
grp, err := groupRepo.CollectingGroup(m.Rule().ToGroupRule())
aggregateKey := buildMessageFinger(m.Rule().AggregateRule, msg)
key := fmt.Sprintf("%s:%s", m.Rule().ID.Hex(), aggregateKey)
if _, ok := collectingGroups[key]; !ok {
grp, err := groupRepo.CollectingGroup(m.Rule().ToGroupRule(aggregateKey))
if err != nil {
log.WithFields(log.Fields{
"msg": msg,
Expand All @@ -60,10 +61,10 @@ func (a *AggregationJob) groupingMessages(msgRepo repository.MessageRepo, groupR
return err
}

collectingGroups[m.Rule().ID] = grp
collectingGroups[key] = grp
}

msg.GroupID = append(msg.GroupID, collectingGroups[m.Rule().ID].ID)
msg.GroupID = append(msg.GroupID, collectingGroups[key].ID)
msg.Status = repository.MessageStatusGrouped
}
}
Expand Down Expand Up @@ -151,3 +152,22 @@ func (a *AggregationJob) pendingMessageGroup(groupRepo repository.MessageGroupRe
return groupRepo.UpdateID(grp.ID, grp)
})
}

func buildMessageFinger(groupRule string, msg repository.Message) string {
finger, err := matcher.NewMessageFinger(groupRule)
if err != nil {
log.WithFields(log.Fields{
"rule": groupRule,
}).Errorf("parse group rule failed: %v", err)
return "[error]invalid_rule"
}
groupKey, err := finger.Run(msg)
if err != nil {
log.WithFields(log.Fields{
"rule": groupRule,
}).Errorf("rule group failed: %v", err)
return "[error]parse_failed"
}

return groupKey
}
44 changes: 44 additions & 0 deletions internal/matcher/message_finger.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
package matcher

import (
"fmt"

"github.com/antonmedv/expr"
"github.com/antonmedv/expr/vm"
"github.com/mylxsw/adanos-alert/internal/repository"
)

type MessageFinger struct {
expr string
program *vm.Program
}

func NewMessageFinger(fingerExpr string) (*MessageFinger, error) {
if fingerExpr == "" {
fingerExpr = `""`
}

program, err := expr.Compile(fingerExpr, expr.Env(&MessageWrap{}))
if err != nil {
return nil, err
}

return &MessageFinger{
expr: fingerExpr,
program: program,
}, nil
}

// Run 根据指定的表达式创建 message 的指纹
func (m *MessageFinger) Run(msg repository.Message) (string, error) {
result, err := expr.Run(m.program, NewMessageWrap(msg))
if err != nil {
return "", err
}

if result == nil {
return "", nil
}

return fmt.Sprintf("%v", result), nil
}
80 changes: 80 additions & 0 deletions internal/matcher/message_finger_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,80 @@
package matcher_test

import (
"testing"
"time"

"github.com/mylxsw/adanos-alert/internal/matcher"
"github.com/mylxsw/adanos-alert/internal/repository"
"github.com/stretchr/testify/assert"
"go.mongodb.org/mongo-driver/bson/primitive"
)

func TestMessageFinger(t *testing.T) {
var msg = repository.Message{
ID: primitive.NewObjectID(),
Content: `{"log_level": "debug", "message": "request", "context": {"user_id": 123}}`,
Meta: repository.MessageMeta{
"environment": "dev",
"server": "192.168.1.1",
},
Tags: []string{"php", "nodejs"},
Origin: "Filebeat",
CreatedAt: time.Now(),
}

{
f, err := matcher.NewMessageFinger(`Meta["server"] + ":" + Origin`)
assert.NoError(t, err)

finger, err := f.Run(msg)
assert.NoError(t, err)
assert.Equal(t, "192.168.1.1:Filebeat", finger)
}

{

f, err := matcher.NewMessageFinger(``)
assert.NoError(t, err)

finger, err := f.Run(msg)
assert.NoError(t, err)
assert.Equal(t, "", finger)
}

{
f, err := matcher.NewMessageFinger(`"hello world"`)
assert.NoError(t, err)

finger, err := f.Run(msg)
assert.NoError(t, err)
assert.Equal(t, "hello world", finger)
}

{
f, err := matcher.NewMessageFinger(`Meta["not_exist_key"]`)
assert.NoError(t, err)

finger, err := f.Run(msg)
assert.NoError(t, err)
assert.Equal(t, "", finger)
}

{
f, err := matcher.NewMessageFinger(`124`)
assert.NoError(t, err)

finger, err := f.Run(msg)
assert.NoError(t, err)
assert.Equal(t, "124", finger)
}

{
f, err := matcher.NewMessageFinger(`true`)
assert.NoError(t, err)

finger, err := f.Run(msg)
assert.NoError(t, err)
assert.Equal(t, "true", finger)
}
}
4 changes: 3 additions & 1 deletion internal/repository/impl/message_group.go
Original file line number Diff line number Diff line change
Expand Up @@ -145,7 +145,7 @@ func (m MessageGroupRepo) Count(filter bson.M) (int64, error) {
func (m MessageGroupRepo) CollectingGroup(rule repository.MessageGroupRule) (group repository.MessageGroup, err error) {
err = m.col.FindOneAndUpdate(
context.TODO(),
bson.M{"rule._id": rule.ID, "status": repository.MessageGroupStatusCollecting},
bson.M{"rule._id": rule.ID, "rule.aggregate_key": rule.AggregateKey, "status": repository.MessageGroupStatusCollecting},
bson.M{"$set": bson.M{"status": repository.MessageGroupStatusCollecting}},
options.FindOneAndUpdate().SetUpsert(true).SetReturnDocument(options.After),
).Decode(&group)
Expand All @@ -160,6 +160,8 @@ func (m MessageGroupRepo) CollectingGroup(rule repository.MessageGroupRule) (gro
}

group.Rule = rule
group.AggregateKey = rule.AggregateKey

_ = m.UpdateID(group.ID, group)
}

Expand Down
Loading

0 comments on commit c271df9

Please sign in to comment.