Skip to content

Commit

Permalink
monitor: reduce subscription mysql usage
Browse files Browse the repository at this point in the history
  • Loading branch information
zexi committed Jul 2, 2020
1 parent 029233a commit de889ec
Show file tree
Hide file tree
Showing 4 changed files with 78 additions and 15 deletions.
23 changes: 22 additions & 1 deletion pkg/monitor/models/commonalert.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,19 +32,37 @@ var (

func init() {
CommonAlertManager = &SCommonAlertManager{
*NewAlertManager(SCommonAlert{}, "commonalert", "commonalerts"),
SAlertManager: *NewAlertManager(SCommonAlert{}, "commonalert", "commonalerts"),
}
CommonAlertManager.SetVirtualObject(CommonAlertManager)
}

type ISubscriptionManager interface {
SetAlert(alert *SCommonAlert)
DeleteAlert(alert *SCommonAlert)
}

type SCommonAlertManager struct {
SAlertManager
subscriptionManager ISubscriptionManager
}

type SCommonAlert struct {
SAlert
}

func (man *SCommonAlertManager) SetSubscriptionManager(sman ISubscriptionManager) {
man.subscriptionManager = sman
}

func (man *SCommonAlertManager) SetSubscriptionAlert(alert *SCommonAlert) {
man.subscriptionManager.SetAlert(alert)
}

func (man *SCommonAlertManager) DeleteSubscriptionAlert(alert *SCommonAlert) {
man.subscriptionManager.DeleteAlert(alert)
}

func (man *SCommonAlertManager) NamespaceScope() rbacutils.TRbacScope {
return rbacutils.ScopeSystem
}
Expand Down Expand Up @@ -246,6 +264,7 @@ func (alert *SCommonAlert) PostCreate(ctx context.Context,
if err != nil {
log.Errorln(errors.Wrap(err, "Alert PerformSetScope"))
}
CommonAlertManager.SetSubscriptionAlert(alert)
}

func (man *SCommonAlertManager) ListItemFilter(
Expand Down Expand Up @@ -552,6 +571,7 @@ func (alert *SCommonAlert) PostUpdate(
if err != nil {
log.Errorln(errors.Wrap(err, "Alert PerformSetScope"))
}
CommonAlertManager.SetSubscriptionAlert(alert)
}

func (alert *SCommonAlert) UpdateNotification(ctx context.Context, userCred mcclient.TokenCredential,
Expand Down Expand Up @@ -618,6 +638,7 @@ func (alert *SCommonAlert) customizeDeleteNotis(
}

func (alert *SCommonAlert) Delete(ctx context.Context, userCred mcclient.TokenCredential) error {
CommonAlertManager.DeleteSubscriptionAlert(alert)
return alert.SetStatus(userCred, monitor.ALERT_STATUS_DELETED, "")
}

Expand Down
19 changes: 14 additions & 5 deletions pkg/monitor/service/dispatcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,14 @@ import (
"yunion.io/x/onecloud/pkg/monitor/subscriptionmodel"
)

var (
SubscriptionWorkerManager *appsrv.SWorkerManager
)

func init() {
SubscriptionWorkerManager = appsrv.NewWorkerManager("SubscriptionWorkerManager", 4, 1024, false)
}

func addCommonAlertDispatcher(prefix string, app *appsrv.Application) {
manager := db.NewModelHandler(subscriptionmodel.SubscriptionManager)

Expand All @@ -28,12 +36,13 @@ func addCommonAlertDispatcher(prefix string, app *appsrv.Application) {
}

func performHandler(ctx context.Context, w http.ResponseWriter, r *http.Request) {
_, query, body := fetchEnv(ctx, w, r)
ctx = context.WithValue(context.Background(), auth.AUTH_TOKEN, auth.AdminCredential())

go subscriptionmodel.SubscriptionManager.PerformWrite(ctx, auth.AdminCredential(), query, body)
SubscriptionWorkerManager.Run(func() {
_, query, body := fetchEnv(ctx, w, r)
ctx = context.WithValue(context.Background(), auth.AUTH_TOKEN, auth.AdminCredential())
subscriptionmodel.SubscriptionManager.PerformWrite(ctx, auth.AdminCredential(), query, body)
appsrv.SendJSON(w, wrap(jsonutils.NewDict(), "subscription"))
}, nil, nil)

appsrv.SendJSON(w, wrap(jsonutils.NewDict(), "subscription"))
}

// fetchEnv fetch handler, params, query and body from ctx(context.Context)
Expand Down
1 change: 1 addition & 0 deletions pkg/monitor/service/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,7 @@ func StartService() {
defer cron.Stop()

subscriptionmodel.SubscriptionManager.AddSubscription()
models.CommonAlertManager.SetSubscriptionManager(subscriptionmodel.SubscriptionManager)

InitInfluxDBSubscriptionHandlers(app, baseOpts)
//common_app.ServeForever(app, baseOpts)
Expand Down
50 changes: 41 additions & 9 deletions pkg/monitor/subscriptionmodel/subcription.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,12 @@ import (
"encoding/json"
"fmt"
"strings"
"sync"
"time"

"yunion.io/x/jsonutils"
"yunion.io/x/log"
"yunion.io/x/pkg/errors"

"yunion.io/x/onecloud/pkg/apis/monitor"
"yunion.io/x/onecloud/pkg/cloudcommon/db"
Expand Down Expand Up @@ -38,12 +40,14 @@ func init() {
"subscription",
"subscriptions",
),
systemAlerts: new(sync.Map),
}
SubscriptionManager.SetVirtualObject(SubscriptionManager)
}

type SSubscriptionManager struct {
db.SVirtualResourceBaseManager
systemAlerts *sync.Map
}

func (self *SSubscriptionManager) getThisFunctionUrl() string {
Expand All @@ -62,27 +66,55 @@ func (self *SSubscriptionManager) AddSubscription() {
log.Errorln("DropSubscription err:", err)
return
}
log.Println("drop success")
log.Infof("drop success")
err = models.DataSourceManager.AddSubscription(sub)
if err != nil {
log.Errorln("add subscription err:", err)
return
}
log.Println("add success")
log.Infof("add success")
if err := self.LoadSystemAlerts(); err != nil {
log.Errorf("load system alerts error: %v", err)
return
}
}

func (self *SSubscriptionManager) LoadSystemAlerts() error {
alerts, err := models.CommonAlertManager.GetSystemAlerts()
if err != nil {
return errors.Wrap(err, "load system alerts")
}
for _, alert := range alerts {
self.SetAlert(&alert)
}
return nil
}

func (self *SSubscriptionManager) AllowPerformWrite(ctx context.Context,
userCred mcclient.TokenCredential, query jsonutils.JSONObject, data jsonutils.JSONObject) bool {
return true
}

func (self *SSubscriptionManager) SetAlert(alert *models.SCommonAlert) {
self.systemAlerts.Store(alert.GetId(), alert)
}

func (self *SSubscriptionManager) DeleteAlert(alert *models.SCommonAlert) {
self.systemAlerts.Delete(alert.GetId())
}

func (self *SSubscriptionManager) GetSystemAlerts() []*models.SCommonAlert {
ret := make([]*models.SCommonAlert, 0)
self.systemAlerts.Range(func(key, val interface{}) bool {
ret = append(ret, val.(*models.SCommonAlert))
return true
})
return nil
}

func (self *SSubscriptionManager) PerformWrite(ctx context.Context, userCred mcclient.TokenCredential,
query jsonutils.JSONObject, data []sub.Point) {
sysAlerts, err := models.CommonAlertManager.GetSystemAlerts()
if err != nil {
log.Errorln(err, "CommonAlertManager.GetSystemAlerts")
}
log.Errorf("subcription list sysalert length:%d", len(sysAlerts))
sysAlerts := self.GetSystemAlerts()
for _, sysalert := range sysAlerts {
details := monitor.CommonAlertDetails{}
details, err := sysalert.GetMoreDetails(details)
Expand All @@ -91,7 +123,7 @@ func (self *SSubscriptionManager) PerformWrite(ctx context.Context, userCred mcc
continue
}
for _, metricDetails := range details.CommonAlertMetricDetails {
evalMatch, match, err := self.Eval(*metricDetails, sysalert, data)
evalMatch, match, err := self.Eval(*metricDetails, *sysalert, data)
if err != nil {
log.Errorln("SSubscriptionManager Eval error:", err)
continue
Expand All @@ -113,7 +145,7 @@ func (self *SSubscriptionManager) PerformWrite(ctx context.Context, userCred mcc
Ctx: context.Background(),
UserCred: auth.AdminCredential(),
}
err := self.evalNotifyOfAlert(sysalert, *metricDetails, evalCtx)
err := self.evalNotifyOfAlert(*sysalert, *metricDetails, evalCtx)
if err != nil {
log.Errorln(err)
}
Expand Down

0 comments on commit de889ec

Please sign in to comment.