Skip to content

Commit

Permalink
Optimize monitor minio (#4679)
Browse files Browse the repository at this point in the history
* add default prome query object storage timeout second

* optimize object storage monitor
  • Loading branch information
bxy4543 committed Apr 12, 2024
1 parent a85d13d commit 2337ab1
Show file tree
Hide file tree
Showing 2 changed files with 81 additions and 20 deletions.
20 changes: 18 additions & 2 deletions controllers/pkg/objectstorage/objectstorage.go
Expand Up @@ -22,6 +22,8 @@ import (
"strings"
"time"

"github.com/labring/sealos/controllers/pkg/utils/env"

"github.com/prometheus/common/model"

"github.com/minio/minio-go/v7"
Expand All @@ -44,6 +46,18 @@ func ListUserObjectStorageBucket(client *minio.Client, username string) ([]strin
return expectBuckets, nil
}

func ListAllObjectStorageBucket(client *minio.Client) ([]string, error) {
buckets, err := client.ListBuckets(context.Background())
if err != nil {
return nil, err
}
var allBuckets []string
for _, bucket := range buckets {
allBuckets = append(allBuckets, bucket.Name)
}
return allBuckets, nil
}

func GetObjectStorageSize(client *minio.Client, bucket string) (int64, int64) {
objects := client.ListObjects(context.Background(), bucket, minio.ListObjectsOptions{
Recursive: true,
Expand Down Expand Up @@ -99,9 +113,11 @@ func GetUserObjectStorageFlow(client *minio.Client, promURL, username, instance
return totalFlow, nil
}

var timeoutDuration = time.Duration(env.GetInt64EnvWithDefault(EnvPromQueryObsTimeoutSecEnv, 10)) * time.Second

const (
timeoutDuration = 5 * time.Second
timeFormat = "2006-01-02 15:04:05"
EnvPromQueryObsTimeoutSecEnv = "PROM_QUERY_OBS_TIMEOUT_SEC"
timeFormat = "2006-01-02 15:04:05"
)

var (
Expand Down
81 changes: 63 additions & 18 deletions controllers/resources/controllers/monitor_controller.go
Expand Up @@ -21,6 +21,7 @@ import (
"fmt"
"math"
"os"
"strings"
"sync"
"time"

Expand Down Expand Up @@ -131,7 +132,7 @@ func NewMonitorReconciler(mgr ctrl.Manager) (*MonitorReconciler, error) {

func (r *MonitorReconciler) StartReconciler(ctx context.Context) error {
r.startPeriodicReconcile()
if r.TrafficClient != nil {
if r.TrafficClient != nil || r.ObjStorageClient != nil {
r.startMonitorTraffic()
}
<-ctx.Done()
Expand Down Expand Up @@ -191,14 +192,14 @@ func (r *MonitorReconciler) startMonitorTraffic() {
startTime, endTime := time.Now().UTC(), time.Now().Truncate(time.Hour).Add(1*time.Hour).UTC()
waitNextHour()
ticker := time.NewTicker(1 * time.Hour)
if err := r.MonitorPodTrafficUsed(startTime, endTime); err != nil {
if err := r.MonitorTrafficUsed(startTime, endTime); err != nil {
r.Logger.Error(err, "failed to monitor pod traffic used")
}
for {
select {
case <-ticker.C:
startTime, endTime = endTime, endTime.Add(1*time.Hour)
if err := r.MonitorPodTrafficUsed(startTime, endTime); err != nil {
if err := r.MonitorTrafficUsed(startTime, endTime); err != nil {
r.Logger.Error(err, "failed to monitor pod traffic used")
break
}
Expand Down Expand Up @@ -383,39 +384,83 @@ func (r *MonitorReconciler) getObjStorageUsed(user string, namedMap *map[string]
}
for i := range buckets {
size, count := objstorage.GetObjectStorageSize(r.ObjStorageClient, buckets[i])
if count == 0 || size == 0 {
if count == 0 || size <= 0 {
continue
}
end := time.Now().Truncate(time.Minute)
// get the traffic of the last minute
bytes, err := objstorage.GetObjectStorageFlow(r.PromURL, buckets[i], r.ObjectStorageInstance, end.Add(-time.Minute), end)
if err != nil {
return fmt.Errorf("failed to get object storage user storage flow: %w", err)
}
objStorageNamed := resources.NewObjStorageResourceNamed(buckets[i])
(*namedMap)[objStorageNamed.String()] = objStorageNamed
if _, ok := (*resMap)[objStorageNamed.String()]; !ok {
(*resMap)[objStorageNamed.String()] = initResources()
}
(*resMap)[objStorageNamed.String()][corev1.ResourceStorage].Add(*resource.NewQuantity(size, resource.BinarySI))
//If object storage traffic bytes is smaller than 0.1 MB, no record is recorded
if bytes >= 100*1024 {
(*resMap)[objStorageNamed.String()][resources.ResourceNetwork].Add(*resource.NewQuantity(bytes, resource.BinarySI))
}
}
return nil
}

func (r *MonitorReconciler) MonitorPodTrafficUsed(startTime, endTime time.Time) error {
logger.Info("start getPodTrafficUsed", "startTime", startTime.Format(time.RFC3339), "endTime", endTime.Format(time.RFC3339))
func (r *MonitorReconciler) MonitorTrafficUsed(startTime, endTime time.Time) error {
logger.Info("start getTrafficUsed", "startTime", startTime.Format(time.RFC3339), "endTime", endTime.Format(time.RFC3339))
execTime := time.Now().UTC()
if err := r.monitorPodTrafficUsed(startTime, endTime); err != nil {
r.Logger.Error(err, "failed to monitor pod traffic used")
if r.TrafficClient != nil {
if err := r.monitorPodTrafficUsed(startTime, endTime); err != nil {
r.Logger.Error(err, "failed to monitor pod traffic used")
}
}
if r.ObjStorageClient != nil {
if err := r.monitorObjectStorageTrafficUsed(startTime, endTime); err != nil {
r.Logger.Error(err, "failed to monitor object storage traffic used")
}
}
r.Logger.Info("success to monitor pod traffic used", "startTime", startTime.Format(time.RFC3339), "endTime", endTime.Format(time.RFC3339), "execTime", time.Since(execTime).String())
return nil
}

func (r *MonitorReconciler) monitorObjectStorageTrafficUsed(startTime, endTime time.Time) error {
buckets, err := objstorage.ListAllObjectStorageBucket(r.ObjStorageClient)
if err != nil {
return fmt.Errorf("failed to list object storage buckets: %w", err)
}
r.Logger.Info("object storage buckets", "buckets len", len(buckets))
wg, _ := errgroup.WithContext(context.Background())
wg.SetLimit(10)
for i := range buckets {
bucket := buckets[i]
if !strings.Contains(bucket, "-") {
continue
}
wg.Go(func() error {
return r.handlerObjectStorageTrafficUsed(startTime, endTime, bucket)
})
}
return wg.Wait()
}

func (r *MonitorReconciler) handlerObjectStorageTrafficUsed(startTime, endTime time.Time, bucket string) error {
bytes, err := objstorage.GetObjectStorageFlow(r.PromURL, bucket, r.ObjectStorageInstance, startTime, endTime)
if err != nil {
return fmt.Errorf("failed to get object storage flow: %w", err)
}
unit := r.Properties.StringMap[resources.ResourceNetwork].Unit
used := int64(math.Ceil(float64(resource.NewQuantity(bytes, resource.BinarySI).MilliValue()) / float64(unit.MilliValue())))
if used <= 0 {
return nil
}

namespace := "ns-" + strings.SplitN(bucket, "-", 2)[0]
ro := resources.Monitor{
Category: namespace,
Name: bucket,
Used: map[uint8]int64{r.Properties.StringMap[resources.ResourceNetwork].Enum: used},
Time: endTime.Add(-1 * time.Minute),
Type: resources.AppType[resources.ObjectStorage],
}
r.Logger.Info("object storage traffic used", "monitor", ro)
err = r.DBClient.InsertMonitor(context.Background(), &ro)
if err != nil {
return fmt.Errorf("failed to insert monitor: %w", err)
}
return nil
}

func (r *MonitorReconciler) monitorPodTrafficUsed(startTime, endTime time.Time) error {
monitors, err := r.DBClient.GetDistinctMonitorCombinations(startTime, endTime)
if err != nil {
Expand Down

0 comments on commit 2337ab1

Please sign in to comment.