Skip to content

Commit

Permalink
add queue manager
Browse files Browse the repository at this point in the history
  • Loading branch information
mylxsw committed Oct 26, 2019
1 parent f33c57c commit 50448c6
Show file tree
Hide file tree
Showing 29 changed files with 883 additions and 142 deletions.
9 changes: 0 additions & 9 deletions api/controller/controller.go

This file was deleted.

129 changes: 129 additions & 0 deletions api/controller/message.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,129 @@
package controller

import (
"encoding/json"
"fmt"
"net/http"
"strconv"

"github.com/jeremywohl/flatten"
"github.com/mylxsw/adanos-alert/internal/repository"
"github.com/mylxsw/container"
"github.com/mylxsw/hades"
)

type MessageController struct {
cc *container.Container
}

func NewMessageController(cc *container.Container) hades.Controller {
return &MessageController{cc: cc}
}

func (m *MessageController) Register(router *hades.Router) {
router.Group("/messages", func(router *hades.Router) {
router.Post("/common/", m.AddCommonMessage)
router.Post("/logstash/", m.AddLogstashMessage)
})
}

type RepoMessage interface {
ToRepo() repository.Message
}

func (m *MessageController) saveMessage(messageRepo repository.MessageRepo, repoMessage RepoMessage, ctx hades.Context) hades.Response {
id, err := messageRepo.Add(repoMessage.ToRepo())
if err != nil {
return ctx.JSONError(err.Error(), http.StatusInternalServerError)
}

return ctx.JSON(hades.M{
"id": id.String(),
})
}

// Add common message

type CommonMessage struct {
Content string `json:"content"`
Meta repository.MessageMeta `json:"meta"`
Tags []string `json:"tags"`
Origin string `json:"origin"`
}

func (msg CommonMessage) ToRepo() repository.Message {
return repository.Message{
Content: msg.Content,
Meta: msg.Meta,
Tags: msg.Tags,
Origin: msg.Origin,
}
}

func (m *MessageController) AddCommonMessage(ctx hades.Context, messageRepo repository.MessageRepo) hades.Response {
var commonMessage CommonMessage
if err := ctx.Unmarshal(&commonMessage); err != nil {
return ctx.JSONError("invalid request", http.StatusUnprocessableEntity)
}

return m.saveMessage(messageRepo, commonMessage, ctx)
}

// Add logstash message

func (m *MessageController) AddLogstashMessage(ctx hades.Context, messageRepo repository.MessageRepo) hades.Response {
flattenJson, err := flatten.FlattenString(string(ctx.Request().Body()), "", flatten.DotStyle)
if err != nil {
return ctx.JSONError(fmt.Sprintf("invalid json: %s", err), http.StatusUnprocessableEntity)
}

var meta repository.MessageMeta
if err := json.Unmarshal([]byte(flattenJson), &meta); err != nil {
return ctx.JSONError(fmt.Sprintf("parse json failed: %s", err), http.StatusInternalServerError)
}

msg := meta["message"]
delete(meta, "message")

return m.saveMessage(messageRepo, CommonMessage{
Content: msg,
Meta: meta,
Tags: nil,
Origin: "logstash",
}, ctx)
}

// Add grafana message

type GrafanaMessage struct {
EvalMatches []GrafanaEvalMatch `json:"evalMatches"`
ImageUrl string `json:"imageUrl"`
Message string `json:"message"`
RuleID int64 `json:"ruleId"`
RuleName string `json:"ruleName"`
RuleUrl string `json:"ruleUrl"`
State string `json:"state"`
Title string `json:"title"`
}

func (g GrafanaMessage) ToRepo() repository.Message {
message, _ := json.Marshal(g)

return repository.Message{
Content: string(message),
Meta: repository.MessageMeta{
"rule_id": strconv.Itoa(int(g.RuleID)),
"rule_name": g.RuleName,
"state": g.State,
"title": g.Title,
},
Tags: nil,
Origin: "grafana",
}
}

type GrafanaEvalMatch struct {
Value float64 `json:"value"`
Metric string `json:"metric"`
Tags map[string]string
}
4 changes: 2 additions & 2 deletions api/controller/welcome.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ type WelcomeController struct {
cc *container.Container
}

func NewWelcomeController(cc *container.Container) Controller {
func NewWelcomeController(cc *container.Container) hades.Controller {
return &WelcomeController{cc: cc}
}

Expand All @@ -25,6 +25,6 @@ type WelcomeMessage struct {
// @Summary 欢迎页面,API版本信息
// @Success 200 {object} controller.WelcomeMessage
// @Router / [get]
func (w *WelcomeController) Home(ctx *hades.WebContext, req *hades.HttpRequest) WelcomeMessage {
func (w *WelcomeController) Home(ctx hades.Context, req hades.Request) WelcomeMessage {
return WelcomeMessage{Version: w.cc.MustGet("version").(string)}
}
11 changes: 7 additions & 4 deletions api/router.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ func routers(cc *container.Container) func(router *hades.Router, mw hades.Reques
conf := cc.MustGet(&configs.Config{}).(*configs.Config)
return func(router *hades.Router, mw hades.RequestMiddleware) {
mws := make([]hades.HandlerDecorator, 0)
mws = append(mws, mw.AccessLog(), mw.CORS("*"), mw.JSONExceptionHandler())
mws = append(mws, mw.AccessLog(), mw.CORS("*"))
if conf.APIToken != "" {
authMiddleware := mw.AuthHandler(func(typ string, credential string) error {
if typ != "Bearer" {
Expand All @@ -29,8 +29,11 @@ func routers(cc *container.Container) func(router *hades.Router, mw hades.Reques

mws = append(mws, authMiddleware)
}
router.Group("/api", func(router *hades.Router) {
controller.NewWelcomeController(cc).Register(router)
}, mws...)

router.WithMiddleware(mws...).Controllers(
"/api",
controller.NewWelcomeController(cc),
controller.NewMessageController(cc),
)
}
}
77 changes: 42 additions & 35 deletions cmd/adanos-alert/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,8 @@ import (
"github.com/mylxsw/adanos-alert/configs"
"github.com/mylxsw/adanos-alert/internal/action"
"github.com/mylxsw/adanos-alert/internal/job"
"github.com/mylxsw/adanos-alert/internal/queue"
"github.com/mylxsw/adanos-alert/internal/repository/impl"
"github.com/mylxsw/asteria/formatter"
"github.com/mylxsw/asteria/level"
"github.com/mylxsw/asteria/log"
"github.com/mylxsw/asteria/writer"
Expand Down Expand Up @@ -45,32 +45,48 @@ func main() {
Value: "adanos-alert",
}))
app.AddFlags(altsrc.NewStringFlag(cli.StringFlag{
Name: "api_token",
Usage: "API Token for api access control",
Value: "",
}))
app.AddFlags(altsrc.NewBoolTFlag(cli.BoolTFlag{
Name: "console_color",
Usage: "log colorful for console",
Name: "api_token",
Usage: "API Token for api access control",
EnvVar: "ADANOS_API_TOKEN",
Value: "",
}))
app.AddFlags(altsrc.NewBoolFlag(cli.BoolFlag{
Name: "use_local_dashboard",
Usage: "whether using local dashboard, this is used when development",
}))
app.AddFlags(altsrc.NewStringFlag(cli.StringFlag{
Name: "aggregation_period",
Usage: "aggregation job execute period",
Value: "30s",
Name: "aggregation_period",
Usage: "aggregation job execute period",
EnvVar: "ADANOS_AGGREGATION_PERIOD",
Value: "30s",
}))
app.AddFlags(altsrc.NewStringFlag(cli.StringFlag{
Name: "action_trigger_period",
Usage: "action trigger job execute period",
Value: "15s",
Name: "action_trigger_period",
Usage: "action trigger job execute period",
EnvVar: "ADANOS_ACTION_TRIGGER_PERIOD",
Value: "15s",
}))
app.AddFlags(altsrc.NewIntFlag(cli.IntFlag{
Name: "queue_job_max_retry_times",
Usage: "set queue job max retry times",
EnvVar: "ADANOS_QUEUE_JOB_MAX_RETRY_TIMES",
Value: 3,
}))
app.AddFlags(altsrc.NewIntFlag(cli.IntFlag{
Name: "queue_worker_num",
Usage: "set queue worker numbers",
EnvVar: "ADANOS_QUEUE_WORKER_NUM",
Value: 3,
}))

app.BeforeInitialize(func(c *cli.Context) error {
log.DefaultLogFormatter(formatter.NewDefaultFormatter(c.Bool("console_color")))
return nil
app.UseStackLogger(func(stackWriter *writer.StackWriter) {
stackWriter.PushWithLevels(writer.NewStdoutWriter())
stackWriter.PushWithLevels(
NewErrorCollectorWriter(),
level.Error,
level.Emergency,
level.Critical,
)
})

app.Singleton(func(c *cli.Context) *configs.Config {
Expand All @@ -87,13 +103,14 @@ func main() {
}

return &configs.Config{
MongoURI: c.String("mongo_uri"),
MongoDB: c.String("mongo_db"),
UseLocalDashboard: c.Bool("use_local_dashboard"),
ConsoleColor: c.Bool("console_color"),
APIToken: c.String("api_token"),
AggregationPeriod: aggregationPeriod,
ActionTriggerPeriod: actionTriggerPeriod,
MongoURI: c.String("mongo_uri"),
MongoDB: c.String("mongo_db"),
UseLocalDashboard: c.Bool("use_local_dashboard"),
APIToken: c.String("api_token"),
AggregationPeriod: aggregationPeriod,
ActionTriggerPeriod: actionTriggerPeriod,
QueueJobMaxRetryTimes: c.Int("queue_job_max_retry_times"),
QueueWorkerNum: c.Int("queue_worker_num"),
}
})

Expand All @@ -113,17 +130,6 @@ func main() {
})

app.Main(func(conf *configs.Config) {
stackWriter := writer.NewStackWriter()
stackWriter.PushWithLevels(writer.NewStdoutWriter())
stackWriter.PushWithLevels(
NewErrorCollectorWriter(),
level.Error,
level.Emergency,
level.Critical,
)

log.All().LogWriter(stackWriter)

log.WithFields(log.Fields{
"config": conf,
}).Debug("configuration")
Expand All @@ -133,6 +139,7 @@ func main() {
app.Provider(impl.ServiceProvider{})
app.Provider(api.ServiceProvider{})
app.Provider(job.ServiceProvider{})
app.Provider(queue.ServiceProvider{})

if err := app.Run(os.Args); err != nil {
log.Errorf("exit with error: %s", err)
Expand Down
15 changes: 8 additions & 7 deletions configs/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,14 +6,15 @@ import (
)

type Config struct {
MongoURI string
MongoDB string
APIToken string
UseLocalDashboard bool
ConsoleColor bool
MongoURI string `json:"mongo_uri"`
MongoDB string `json:"mongo_db"`
APIToken string `json:"api_token"`
UseLocalDashboard bool `json:"use_local_dashboard"`

AggregationPeriod time.Duration
ActionTriggerPeriod time.Duration
AggregationPeriod time.Duration `json:"aggregation_period"`
ActionTriggerPeriod time.Duration `json:"action_trigger_period"`
QueueJobMaxRetryTimes int `json:"queue_job_max_retry_times"`
QueueWorkerNum int `json:"queue_worker_num"`
}

func (conf *Config) Serialize() string {
Expand Down
9 changes: 5 additions & 4 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -12,12 +12,14 @@ require (
github.com/golang/snappy v0.0.1 // indirect
github.com/google/go-cmp v0.3.0 // indirect
github.com/gorilla/mux v1.7.3
github.com/jeremywohl/flatten v0.0.0-20190921043622-d936035e55cf
github.com/mylxsw/asteria v0.0.0-20190818103215-f64f703d8999
github.com/mylxsw/coll v0.0.0-20190810120926-a7a6f0f4bae8
github.com/mylxsw/container v0.0.0-20190810122756-1fd99a90f7c6
github.com/mylxsw/glacier v0.0.0-20190916101037-6d7b39368699
github.com/mylxsw/container v0.0.0-20191022090726-425f1756accf
github.com/mylxsw/glacier v0.0.0-20191022103403-b8923fea418b
github.com/mylxsw/go-toolkit v0.0.0-20190919070831-69b6f1e50428
github.com/mylxsw/hades v0.0.0-20190908103158-385ba1d43475
github.com/mylxsw/hades v0.0.0-20191022102414-c812ba19d1d3
github.com/pkg/errors v0.8.1
github.com/stretchr/testify v1.4.0
github.com/swaggo/files v0.0.0-20190704085106-630677cd5c14 // indirect
github.com/swaggo/http-swagger v0.0.0-20190614090009-c2865af9083e
Expand All @@ -31,5 +33,4 @@ require (
golang.org/x/net v0.0.0-20191007182048-72f939374954 // indirect
golang.org/x/sync v0.0.0-20190911185100-cd5d95a43a6e // indirect
golang.org/x/tools v0.0.0-20191007185444-6536af71d98a // indirect
gopkg.in/yaml.v2 v2.2.4 // indirect
)
16 changes: 16 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ github.com/cpuguy83/go-md2man/v2 v2.0.0-20190314233015-f79a8a8ca69d/go.mod h1:ma
github.com/cpuguy83/go-md2man/v2 v2.0.0 h1:EoUDS0afbrsXAZ9YQ9jdu/mZ2sXgT1/2yyNng4PGlyM=
github.com/cpuguy83/go-md2man/v2 v2.0.0/go.mod h1:maD7wRr/U5Z6m/iR4s+kqSMx2CaBsrgA7czyZG/E6dU=
github.com/creack/pty v1.1.7/go.mod h1:lj5s0c3V2DBrqTV7llrYr5NG6My20zk30Fl46Y7DoTY=
github.com/creack/pty v1.1.9/go.mod h1:oKZEueFk5CKHvIhNR5MUki03XCEU+Q6VDXinZuGJ33E=
github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c=
github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
Expand Down Expand Up @@ -53,8 +54,16 @@ github.com/gorilla/context v1.1.1 h1:AWwleXJkX/nhcU9bZSnZoi3h/qGYqQAGhq6zZe/aQW8
github.com/gorilla/context v1.1.1/go.mod h1:kBGZzfjB9CEq2AlWe17Uuf7NDRt0dE0s8S51q0aT7Yg=
github.com/gorilla/mux v1.7.3 h1:gnP5JzjVOuiZD07fKKToCAOjS0yOpj/qPETTXCCS6hw=
github.com/gorilla/mux v1.7.3/go.mod h1:1lud6UwP+6orDFRuTfBEV8e9/aOM/c4fVVCaMa2zaAs=
github.com/gorilla/schema v1.1.0 h1:CamqUDOFUBqzrvxuz2vEwo8+SUdwsluFh7IlzJh30LY=
github.com/gorilla/schema v1.1.0/go.mod h1:kgLaKoK1FELgZqMAVxx/5cbj0kT+57qxUrAlIO2eleU=
github.com/gorilla/securecookie v1.1.1 h1:miw7JPhV+b/lAHSXz4qd/nN9jRiAFV5FwjeKyCS8BvQ=
github.com/gorilla/securecookie v1.1.1/go.mod h1:ra0sb63/xPlUeL+yeDciTfxMRAA+MP+HVt/4epWDjd4=
github.com/gorilla/sessions v1.2.0 h1:S7P+1Hm5V/AT9cjEcUD5uDaQSX0OE577aCXgoaKpYbQ=
github.com/gorilla/sessions v1.2.0/go.mod h1:dk2InVEVJ0sfLlnXv9EAgkf6ecYs/i80K/zI+bUmuGM=
github.com/gorilla/websocket v1.4.1 h1:q7AeDBpnBk8AogcD4DSag/Ukw/KV+YhzLj2bP5HvKCM=
github.com/gorilla/websocket v1.4.1/go.mod h1:YR8l580nyteQvAITg2hZ9XVh4b55+EU/adAjf1fMHhE=
github.com/jeremywohl/flatten v0.0.0-20190921043622-d936035e55cf h1:Ut4tTtPNmInWiEWJRernsWm688R0RN6PFO8sZhwI0sk=
github.com/jeremywohl/flatten v0.0.0-20190921043622-d936035e55cf/go.mod h1:4AmD/VxjWcI5SRB0n6szE2A6s2fsNHDLO0nAlMHgfLQ=
github.com/json-iterator/go v1.1.7 h1:KfgG9LzI+pYjr4xvmz/5H4FXjokeP+rlHLhv3iH62Fo=
github.com/json-iterator/go v1.1.7/go.mod h1:KdQUCv79m/52Kvf8AW2vK1V8akMuk1QjK/uOdHXbAo4=
github.com/jtolds/gls v4.20.0+incompatible/go.mod h1:QJZ7F/aHp+rZTRtaJ1ow/lLfFfVYBRgL+9YlvaHOwJU=
Expand Down Expand Up @@ -84,8 +93,13 @@ github.com/mylxsw/coll v0.0.0-20190810120926-a7a6f0f4bae8 h1:TtxSw54bx34zGgs7Y/V
github.com/mylxsw/coll v0.0.0-20190810120926-a7a6f0f4bae8/go.mod h1:Ugpjgv7bOSn1NXiPNHl92DdCGP2siWk50irFSyI+Hf8=
github.com/mylxsw/container v0.0.0-20190810122756-1fd99a90f7c6 h1:wA8rXGj4ajvUHOPY3LHp1MQlB1v46J24PyaPG7MmxE8=
github.com/mylxsw/container v0.0.0-20190810122756-1fd99a90f7c6/go.mod h1:v2QwNL+V2nI1o7naopTXXalpa1Y6b5E8lCwwANiYfyc=
github.com/mylxsw/container v0.0.0-20191022071657-dd26369c6bb9/go.mod h1:v2QwNL+V2nI1o7naopTXXalpa1Y6b5E8lCwwANiYfyc=
github.com/mylxsw/container v0.0.0-20191022090726-425f1756accf h1:Fh5wpFUHnxWTiqrc7D+cKpSpIsQgICwpokK2nBj94LQ=
github.com/mylxsw/container v0.0.0-20191022090726-425f1756accf/go.mod h1:v2QwNL+V2nI1o7naopTXXalpa1Y6b5E8lCwwANiYfyc=
github.com/mylxsw/glacier v0.0.0-20190916101037-6d7b39368699 h1:jWaOKY0o6wrEoCah7jSBtj4arQgwwveOX22XHGO8GHQ=
github.com/mylxsw/glacier v0.0.0-20190916101037-6d7b39368699/go.mod h1:8RWGNKzQ6DIk2AnifICQhdeapZYRkmSoP0yv3kVu0Mo=
github.com/mylxsw/glacier v0.0.0-20191022103403-b8923fea418b h1:3iLEA90GmVCVDbLt+BbtbsjVHM8YngiH0MOQ7TrCKU4=
github.com/mylxsw/glacier v0.0.0-20191022103403-b8923fea418b/go.mod h1:h4qLgoj5NOG4AwXQO9gvddD5KAiMlTSNJNONXHBwunI=
github.com/mylxsw/go-toolkit v0.0.0-20190810132703-486dd5ff72a9 h1:RgHkiDyU7XDmZqk0Cpqnfn2NmYpginMI69RX0BOzKis=
github.com/mylxsw/go-toolkit v0.0.0-20190810132703-486dd5ff72a9/go.mod h1:s2TWTsAxqwsOr1+r19Fv7bVyLlsTZokRhNFOegKpLrE=
github.com/mylxsw/go-toolkit v0.0.0-20190919070831-69b6f1e50428 h1:o3dalCkzhjT68fH/e9xahtRpiLJnhs4cRacrkCl96xY=
Expand All @@ -94,6 +108,8 @@ github.com/mylxsw/graceful v0.0.0-20190810132432-36a0395cf382 h1:zybIHlDUFp83gs1
github.com/mylxsw/graceful v0.0.0-20190810132432-36a0395cf382/go.mod h1:Skt4GaATnUdq3MArrHK32UsSC9tq8GB0ZToWkF420Os=
github.com/mylxsw/hades v0.0.0-20190908103158-385ba1d43475 h1:RvmNk1D/D6F/Ts+t+210DgOO5bhjWEenegsADnX2ypY=
github.com/mylxsw/hades v0.0.0-20190908103158-385ba1d43475/go.mod h1:w9ekIRDu0QvS10xbEeJiEkDsNa3OOV33dYBz1Mzx5nU=
github.com/mylxsw/hades v0.0.0-20191022102414-c812ba19d1d3 h1:4UxnFrohUvmn+DamENMj6tpThMZY7WLUry54S9swf1g=
github.com/mylxsw/hades v0.0.0-20191022102414-c812ba19d1d3/go.mod h1:wbRVmQL7e21irc1poKY2dRTE0hL+bG8mz5liuMTsKTs=
github.com/pkg/errors v0.8.1 h1:iURUrRGxPUNPdy5/HRSm+Yj6okJ6UtLINN0Q9M4+h3I=
github.com/pkg/errors v0.8.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0=
github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM=
Expand Down
Loading

0 comments on commit 50448c6

Please sign in to comment.