Skip to content

Commit

Permalink
add more api
Browse files Browse the repository at this point in the history
  • Loading branch information
mylxsw committed Oct 28, 2019
1 parent 4aea1f5 commit 7747762
Show file tree
Hide file tree
Showing 26 changed files with 949 additions and 24 deletions.
102 changes: 102 additions & 0 deletions api/controller/group.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,102 @@
package controller

import (
"net/http"

"github.com/mylxsw/adanos-alert/internal/repository"
"github.com/mylxsw/container"
"github.com/mylxsw/hades"
"go.mongodb.org/mongo-driver/bson"
"go.mongodb.org/mongo-driver/bson/primitive"
)

type GroupController struct {
cc *container.Container
}

func NewGroupController(cc *container.Container) hades.Controller {
return &GroupController{cc: cc}
}

func (g GroupController) Register(router *hades.Router) {
router.Group("/groups/", func(router *hades.Router) {
router.Get("/", g.Groups).Name("groups:all")
router.Get("/{id}/", g.Group).Name("groups:one")
})
}

type GroupsResp struct {
Groups []repository.MessageGroup `json:"groups"`
Next int64 `json:"next"`
}

// Groups list all message groups
// Arguments:
// - offset/limit
// - status
// - rule_id
func (g GroupController) Groups(ctx hades.Context, groupRepo repository.MessageGroupRepo) (*GroupsResp, error) {
offset := ctx.Int64Input("offset", 0)
limit := ctx.Int64Input("limit", 10)

filter := bson.M{}

status := ctx.Input("status")
if status != "" {
filter["status"] = status
}

ruleID, err := primitive.ObjectIDFromHex(ctx.Input("rule_id"))
if err == nil {
filter["rule._id"] = ruleID
}

grps, next, err := groupRepo.Paginate(filter, offset, limit)
if err != nil {
return nil, hades.WrapJSONError(err, http.StatusInternalServerError)
}

return &GroupsResp{
Groups: grps,
Next: next,
}, nil
}

type GroupResp struct {
Group repository.MessageGroup `json:"group"`
Messages []repository.Message `json:"messages"`
Next int64 `json:"next"`
}

func (g GroupController) Group(
ctx hades.Context,
groupRepo repository.MessageGroupRepo,
messageRepo repository.MessageRepo,
) (*GroupResp, error) {
offset := ctx.Int64Input("offset", 0)
limit := ctx.Int64Input("limit", 10)

groupID, err := primitive.ObjectIDFromHex(ctx.PathVar("id"))
if err != nil {
return nil, hades.WrapJSONError(err, http.StatusUnprocessableEntity)
}

grp, err := groupRepo.Get(groupID)
if err != nil {
return nil, hades.WrapJSONError(err, http.StatusInternalServerError)
}

filter := messagesFilter(ctx)
filter["group_ids"] = bson.M{"$in": groupID}

messages, next, err := messageRepo.Paginate(filter, offset, limit)
if err != nil {
return nil, hades.WrapJSONError(err, http.StatusInternalServerError)
}

return &GroupResp{
Group: grp,
Messages: messages,
Next: next,
}, nil
}
239 changes: 237 additions & 2 deletions api/controller/message.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,11 +5,15 @@ import (
"fmt"
"net/http"
"strconv"
"time"

"github.com/jeremywohl/flatten"
"github.com/mylxsw/adanos-alert/internal/repository"
"github.com/mylxsw/adanos-alert/pkg/template"
"github.com/mylxsw/container"
"github.com/mylxsw/hades"
"go.mongodb.org/mongo-driver/bson"
"go.mongodb.org/mongo-driver/bson/primitive"
)

type MessageController struct {
Expand All @@ -22,11 +26,111 @@ func NewMessageController(cc *container.Container) hades.Controller {

func (m *MessageController) Register(router *hades.Router) {
router.Group("/messages", func(router *hades.Router) {
router.Post("/common/", m.AddCommonMessage)
router.Post("/logstash/", m.AddLogstashMessage)
router.Get("/", m.Messages).Name("messages:all")
router.Get("/{id}/", m.Message).Name("messages:one")

router.Post("/common/", m.AddCommonMessage).Name("messages:add:common")
router.Post("/logstash/", m.AddLogstashMessage).Name("messages:add:logstash")
router.Post("/grafana/", m.AddGrafanaMessage).Name("messages:add:grafana")
router.Post("/prometheus/api/v1/alerts", m.AddPrometheusMessage).Name("messages:add:prometheus") // url 地址末尾不包含 "/"
router.Post("/prometheus_alertmanager/", m.AddPrometheusAlertMessage).Name("messages:add:prometheus-alert")
router.Post("/openfalcon/im/", m.AddOpenFalconMessage).Name("messages:add:openfalcon")
})

router.Group("/messages-count/", func(router *hades.Router) {
router.Get("/", m.Count).Name("messages:count")
})
}

// messagesFilter some query conditions for messages
func messagesFilter(ctx hades.Context) bson.M {
filter := bson.M{}

meta := ctx.Input("meta")
if meta != "" {
filter["meta.value"] = meta
}
tag := ctx.Input("tag")
if tag != "" {
filter["tag"] = bson.M{"$in": tag}
}
origin := ctx.Input("origin")
if origin != "" {
filter["origin"] = origin
}
status := ctx.Input("status")
if status != "" {
filter["status"] = status
}

return filter
}

// Count return message count for your conditions
func (m *MessageController) Count(ctx hades.Context, msgRepo repository.MessageRepo) hades.Response {
filter := messagesFilter(ctx)
msgCount, err := msgRepo.Count(filter)
if err != nil {
return ctx.JSONError(err.Error(), http.StatusInternalServerError)
}

return ctx.JSON(hades.M{
"count": msgCount,
})
}

// MessagesResp is a response object for Messages API
type MessagesResp struct {
Messages []repository.Message
Next int64
}

// Messages return all messages
func (m *MessageController) Messages(ctx hades.Context, msgRepo repository.MessageRepo) (*MessagesResp, error) {
offset := ctx.Int64Input("offset", 0)
limit := ctx.Int64Input("limit", 10)

filter := messagesFilter(ctx)
groupIDHex := ctx.Input("group_id")
if groupIDHex != "" {
groupID, err := primitive.ObjectIDFromHex(groupIDHex)
if err != nil {
return nil, hades.WrapJSONError(fmt.Errorf("invalid group_id: %w", err), http.StatusUnprocessableEntity)
}

filter["group_ids"] = bson.M{"$in": groupID}
}

messages, next, err := msgRepo.Paginate(filter, offset, limit)
if err != nil {
return nil, hades.WrapJSONError(fmt.Errorf("query failed: %v", err), http.StatusInternalServerError)
}

return &MessagesResp{
Messages: messages,
Next: next,
}, nil
}

// Message return one message
func (m *MessageController) Message(ctx hades.Context, msgRepo repository.MessageRepo) (*repository.Message, error) {
id, err := primitive.ObjectIDFromHex(ctx.PathVar("id"))
if err != nil {
return nil, hades.WrapJSONError(fmt.Errorf("invalid id: %w", err), http.StatusUnprocessableEntity)
}

message, err := msgRepo.Get(id)
if err != nil {
if err == repository.ErrNotFound {
return nil, hades.WrapJSONError(fmt.Errorf("no such message: %w", err), http.StatusNotFound)
}

return nil, err
}

return &message, nil
}

type RepoMessage interface {
ToRepo() repository.Message
}
Expand Down Expand Up @@ -127,3 +231,134 @@ type GrafanaEvalMatch struct {
Metric string `json:"metric"`
Tags map[string]string
}

func (m *MessageController) AddGrafanaMessage(ctx hades.Context, messageRepo repository.MessageRepo) hades.Response {
var grafanaMessage GrafanaMessage
if err := ctx.Unmarshal(&grafanaMessage); err != nil {
return ctx.JSONError("invalid request", http.StatusUnprocessableEntity)
}

repoMessage := grafanaMessage.ToRepo()
return m.saveMessage(messageRepo, CommonMessage{
Content: repoMessage.Content,
Meta: repoMessage.Meta,
Tags: repoMessage.Tags,
Origin: repoMessage.Origin,
}, ctx)
}

// add prometheus alert message

type PrometheusMessage struct {
Status string `json:"status"`
Labels repository.MessageMeta `json:"labels"`
Annotations repository.MessageMeta `json:"annotations"`
StartsAt time.Time `json:"startsAt"`
EndsAt time.Time `json:"endsAt"`
GeneratorURL string `json:"generatorURL"`
}

func (pm PrometheusMessage) ToRepo() repository.Message {
data, _ := json.Marshal(pm)
return repository.Message{
Content: string(data),
Meta: pm.Labels,
Tags: nil,
Origin: "prometheus",
}
}

func (m *MessageController) AddPrometheusMessage(ctx hades.Context, messageRepo repository.MessageRepo) hades.Response {
var prometheusMessage PrometheusMessage
if err := ctx.Unmarshal(&prometheusMessage); err != nil {
return ctx.JSONError("invalid request", http.StatusUnprocessableEntity)
}

repoMessage := prometheusMessage.ToRepo()
return m.saveMessage(messageRepo, CommonMessage{
Content: repoMessage.Content,
Meta: repoMessage.Meta,
Tags: repoMessage.Tags,
Origin: repoMessage.Origin,
}, ctx)
}

// add prometheus-alert message

type PrometheusAlertMessage struct {
Version string `json:"version"`
GroupKey string `json:"groupKey"`

Receiver string `json:"receiver"`
Status string `json:"status"`
Alerts []PrometheusMessage `json:"alerts"`

GroupLabels repository.MessageMeta `json:"groupLabels"`
CommonLabels repository.MessageMeta `json:"commonLabels"`
CommonAnnotations repository.MessageMeta `json:"commonAnnotations"`

ExternalURL string `json:"externalURL"`
}

func (pam PrometheusAlertMessage) ToRepo() repository.Message {
meta := make(repository.MessageMeta)
for k, v := range pam.GroupLabels {
meta[k] = v
}

for k, v := range pam.CommonLabels {
meta[k] = v
}

meta["status"] = pam.Status

data, _ := json.Marshal(pam)
return repository.Message{
Content: string(data),
Meta: meta,
Tags: nil,
Origin: "prometheus-alert",
}
}

func (m *MessageController) AddPrometheusAlertMessage(ctx hades.Context, messageRepo repository.MessageRepo) hades.Response {
var prometheusMessage PrometheusAlertMessage
if err := ctx.Unmarshal(&prometheusMessage); err != nil {
return ctx.JSONError("invalid request", http.StatusUnprocessableEntity)
}

repoMessage := prometheusMessage.ToRepo()
return m.saveMessage(messageRepo, CommonMessage{
Content: repoMessage.Content,
Meta: repoMessage.Meta,
Tags: repoMessage.Tags,
Origin: repoMessage.Origin,
}, ctx)
}

// add open-falcon message

func (m *MessageController) AddOpenFalconMessage(ctx hades.Context, messageRepo repository.MessageRepo) hades.Response {
tos := ctx.Input("tos")
content := ctx.Input("content")

if content == "" {
return ctx.JSONError("invalid request, content required", http.StatusUnprocessableEntity)
}

meta := make(repository.MessageMeta)
im := template.ParseOpenFalconImMessage(content)
meta["status"] = im.Status
meta["priority"] = strconv.Itoa(im.Priority)
meta["endpoint"] = im.Endpoint
meta["current_step"] = strconv.Itoa(im.CurrentStep)
meta["body"] = im.Body
meta["format_time"] = im.FormatTime

return m.saveMessage(messageRepo, CommonMessage{
Content: content,
Meta: meta,
Tags: []string{tos},
Origin: "open-falcon",
}, ctx)
}
Loading

0 comments on commit 7747762

Please sign in to comment.