Skip to content

Commit

Permalink
bugfix for prometheus message
Browse files Browse the repository at this point in the history
  • Loading branch information
mylxsw committed Jul 16, 2020
1 parent da15c78 commit 6582c51
Show file tree
Hide file tree
Showing 6 changed files with 1,198 additions and 1,144 deletions.
37 changes: 27 additions & 10 deletions agent/api/message.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ func (m *MessageController) Register(router *web.Router) {
})
}

func (m *MessageController) saveMessage(msgRepo store.MessageStore, commonMessage misc.CommonMessage, ctx web.Context) web.Response {
func (m *MessageController) saveMessage(msgRepo store.MessageStore, commonMessage misc.CommonMessage, ctx web.Context) error {
commonMessage.Meta["adanos_agent_version"] = m.cc.MustGet(glacier.VersionKey).(string)
commonMessage.Meta["adanos_agent_ip"] = misc.ServerIP()
m.cc.MustResolve(func(db *ledis.DB) {
Expand All @@ -46,7 +46,15 @@ func (m *MessageController) saveMessage(msgRepo store.MessageStore, commonMessag

if err := msgRepo.Enqueue(&req); err != nil {
log.Warningf("本地存储失败: %s", err)
return ctx.JSONError(fmt.Sprintf("本地存储写入失败:%v", err), http.StatusInternalServerError)
return err
}

return nil
}

func (m *MessageController) errorWrap(ctx web.Context, err error) web.Response {
if err != nil {
return ctx.JSONError(err.Error(), http.StatusInternalServerError)
}

return ctx.JSON(struct{}{})
Expand All @@ -58,7 +66,7 @@ func (m *MessageController) AddCommonMessage(ctx web.Context, messageStore store
return ctx.JSONError(fmt.Sprintf("invalid request: %v", err), http.StatusUnprocessableEntity)
}

return m.saveMessage(messageStore, commonMessage, ctx)
return m.errorWrap(ctx, m.saveMessage(messageStore, commonMessage, ctx))
}

// AddLogstashMessage Add logstash message
Expand All @@ -68,7 +76,7 @@ func (m *MessageController) AddLogstashMessage(ctx web.Context, messageStore sto
return ctx.JSONError(err.Error(), http.StatusInternalServerError)
}

return m.saveMessage(messageStore, *commonMessage, ctx)
return m.errorWrap(ctx, m.saveMessage(messageStore, *commonMessage, ctx))
}

// Add grafana message
Expand All @@ -78,17 +86,26 @@ func (m *MessageController) AddGrafanaMessage(ctx web.Context, messageStore stor
return ctx.JSONError(err.Error(), http.StatusInternalServerError)
}

return m.saveMessage(messageStore, *commonMessage, ctx)
return m.errorWrap(ctx, m.saveMessage(messageStore, *commonMessage, ctx))
}

// add prometheus alert message
func (m *MessageController) AddPrometheusMessage(ctx web.Context, messageStore store.MessageStore) web.Response {
commonMessage, err := misc.PrometheusToCommonMessage(ctx.Request().Body())
commonMessages, err := misc.PrometheusToCommonMessages(ctx.Request().Body())
if err != nil {
return ctx.JSONError(err.Error(), http.StatusInternalServerError)
return m.errorWrap(ctx, err)
}

for _, cm := range commonMessages {
if err := m.saveMessage(messageStore, *cm, ctx); err != nil {
log.WithFields(log.Fields{
"message": cm,
}).Errorf("save prometheus message failed: %v", err)
continue
}
}

return m.saveMessage(messageStore, *commonMessage, ctx)
return m.errorWrap(ctx, nil)
}

// add prometheus-alert message
Expand All @@ -98,7 +115,7 @@ func (m *MessageController) AddPrometheusAlertMessage(ctx web.Context, messageSt
return ctx.JSONError(err.Error(), http.StatusInternalServerError)
}

return m.saveMessage(messageStore, *commonMessage, ctx)
return m.errorWrap(ctx, m.saveMessage(messageStore, *commonMessage, ctx))
}

// add open-falcon message
Expand All @@ -110,5 +127,5 @@ func (m *MessageController) AddOpenFalconMessage(ctx web.Context, messageStore s
return ctx.JSONError("invalid request, content required", http.StatusUnprocessableEntity)
}

return m.saveMessage(messageStore, *misc.OpenFalconToCommonMessage(tos, content), ctx)
return m.errorWrap(ctx, m.saveMessage(messageStore, *misc.OpenFalconToCommonMessage(tos, content), ctx))
}
37 changes: 28 additions & 9 deletions api/controller/message.go
Original file line number Diff line number Diff line change
Expand Up @@ -161,8 +161,11 @@ func (m *MessageController) Message(ctx web.Context, msgRepo repository.MessageR
return &message, nil
}

func (m *MessageController) saveMessage(messageRepo repository.MessageRepo, repoMessage misc.RepoMessage, ctx web.Context) web.Response {
id, err := messageRepo.Add(repoMessage.ToRepo())
func (m *MessageController) saveMessage(messageRepo repository.MessageRepo, repoMessage misc.RepoMessage) (id primitive.ObjectID, err error) {
return messageRepo.Add(repoMessage.ToRepo())
}

func (m *MessageController) errorWrap(ctx web.Context, id primitive.ObjectID, err error) web.Response {
if err != nil {
return ctx.JSONError(err.Error(), http.StatusInternalServerError)
}
Expand All @@ -180,7 +183,8 @@ func (m *MessageController) AddCommonMessage(ctx web.Context, messageRepo reposi
return ctx.JSONError(fmt.Sprintf("invalid request: %v", err), http.StatusUnprocessableEntity)
}

return m.saveMessage(messageRepo, commonMessage, ctx)
id, err := m.saveMessage(messageRepo, commonMessage)
return m.errorWrap(ctx, id, err)
}

// AddLogstashMessage Add logstash message
Expand All @@ -190,7 +194,8 @@ func (m *MessageController) AddLogstashMessage(ctx web.Context, messageRepo repo
return ctx.JSONError(err.Error(), http.StatusInternalServerError)
}

return m.saveMessage(messageRepo, commonMessage, ctx)
id, err := m.saveMessage(messageRepo, commonMessage)
return m.errorWrap(ctx, id, err)
}

// Add grafana message
Expand All @@ -200,17 +205,29 @@ func (m *MessageController) AddGrafanaMessage(ctx web.Context, messageRepo repos
return ctx.JSONError(err.Error(), http.StatusInternalServerError)
}

return m.saveMessage(messageRepo, commonMessage, ctx)
id, err := m.saveMessage(messageRepo, commonMessage)
return m.errorWrap(ctx, id, err)
}

// add prometheus alert message
func (m *MessageController) AddPrometheusMessage(ctx web.Context, messageRepo repository.MessageRepo) web.Response {
commonMessage, err := misc.PrometheusToCommonMessage(ctx.Request().Body())
commonMessages, err := misc.PrometheusToCommonMessages(ctx.Request().Body())
if err != nil {
return ctx.JSONError(err.Error(), http.StatusInternalServerError)
}

return m.saveMessage(messageRepo, commonMessage, ctx)
var lastID primitive.ObjectID
var lastErr error
for _, cm := range commonMessages {
lastID, lastErr = m.saveMessage(messageRepo, *cm)
if lastErr != nil {
log.WithFields(log.Fields{
"message": cm,
}).Errorf("save prometheus message failed: %v", lastErr)
}
}

return m.errorWrap(ctx, lastID, lastErr)
}

// add prometheus-alert message
Expand All @@ -220,7 +237,8 @@ func (m *MessageController) AddPrometheusAlertMessage(ctx web.Context, messageRe
return ctx.JSONError(err.Error(), http.StatusInternalServerError)
}

return m.saveMessage(messageRepo, commonMessage, ctx)
id, err := m.saveMessage(messageRepo, commonMessage)
return m.errorWrap(ctx, id, err)
}

// add open-falcon message
Expand All @@ -232,5 +250,6 @@ func (m *MessageController) AddOpenFalconMessage(ctx web.Context, messageRepo re
return ctx.JSONError("invalid request, content required", http.StatusUnprocessableEntity)
}

return m.saveMessage(messageRepo, misc.OpenFalconToCommonMessage(tos, content), ctx)
id, err := m.saveMessage(messageRepo, misc.OpenFalconToCommonMessage(tos, content))
return m.errorWrap(ctx, id, err)
}
Loading

0 comments on commit 6582c51

Please sign in to comment.