Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Optimize monitor minio #4679

Merged
merged 3 commits into from Apr 12, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
20 changes: 18 additions & 2 deletions controllers/pkg/objectstorage/objectstorage.go
Expand Up @@ -22,6 +22,8 @@ import (
"strings"
"time"

"github.com/labring/sealos/controllers/pkg/utils/env"

"github.com/prometheus/common/model"

"github.com/minio/minio-go/v7"
Expand All @@ -44,6 +46,18 @@ func ListUserObjectStorageBucket(client *minio.Client, username string) ([]strin
return expectBuckets, nil
}

func ListAllObjectStorageBucket(client *minio.Client) ([]string, error) {
buckets, err := client.ListBuckets(context.Background())
if err != nil {
return nil, err
}
var allBuckets []string
for _, bucket := range buckets {
allBuckets = append(allBuckets, bucket.Name)
}
return allBuckets, nil
}

func GetObjectStorageSize(client *minio.Client, bucket string) (int64, int64) {
objects := client.ListObjects(context.Background(), bucket, minio.ListObjectsOptions{
Recursive: true,
Expand Down Expand Up @@ -99,9 +113,11 @@ func GetUserObjectStorageFlow(client *minio.Client, promURL, username, instance
return totalFlow, nil
}

var timeoutDuration = time.Duration(env.GetInt64EnvWithDefault(EnvPromQueryObsTimeoutSecEnv, 10)) * time.Second

const (
timeoutDuration = 5 * time.Second
timeFormat = "2006-01-02 15:04:05"
EnvPromQueryObsTimeoutSecEnv = "PROM_QUERY_OBS_TIMEOUT_SEC"
timeFormat = "2006-01-02 15:04:05"
)

var (
Expand Down
81 changes: 63 additions & 18 deletions controllers/resources/controllers/monitor_controller.go
Expand Up @@ -21,6 +21,7 @@ import (
"fmt"
"math"
"os"
"strings"
"sync"
"time"

Expand Down Expand Up @@ -131,7 +132,7 @@ func NewMonitorReconciler(mgr ctrl.Manager) (*MonitorReconciler, error) {

func (r *MonitorReconciler) StartReconciler(ctx context.Context) error {
r.startPeriodicReconcile()
if r.TrafficClient != nil {
if r.TrafficClient != nil || r.ObjStorageClient != nil {
r.startMonitorTraffic()
}
<-ctx.Done()
Expand Down Expand Up @@ -191,14 +192,14 @@ func (r *MonitorReconciler) startMonitorTraffic() {
startTime, endTime := time.Now().UTC(), time.Now().Truncate(time.Hour).Add(1*time.Hour).UTC()
waitNextHour()
ticker := time.NewTicker(1 * time.Hour)
if err := r.MonitorPodTrafficUsed(startTime, endTime); err != nil {
if err := r.MonitorTrafficUsed(startTime, endTime); err != nil {
r.Logger.Error(err, "failed to monitor pod traffic used")
}
for {
select {
case <-ticker.C:
startTime, endTime = endTime, endTime.Add(1*time.Hour)
if err := r.MonitorPodTrafficUsed(startTime, endTime); err != nil {
if err := r.MonitorTrafficUsed(startTime, endTime); err != nil {
r.Logger.Error(err, "failed to monitor pod traffic used")
break
}
Expand Down Expand Up @@ -383,39 +384,83 @@ func (r *MonitorReconciler) getObjStorageUsed(user string, namedMap *map[string]
}
for i := range buckets {
size, count := objstorage.GetObjectStorageSize(r.ObjStorageClient, buckets[i])
if count == 0 || size == 0 {
if count == 0 || size <= 0 {
continue
}
end := time.Now().Truncate(time.Minute)
// get the traffic of the last minute
bytes, err := objstorage.GetObjectStorageFlow(r.PromURL, buckets[i], r.ObjectStorageInstance, end.Add(-time.Minute), end)
if err != nil {
return fmt.Errorf("failed to get object storage user storage flow: %w", err)
}
objStorageNamed := resources.NewObjStorageResourceNamed(buckets[i])
(*namedMap)[objStorageNamed.String()] = objStorageNamed
if _, ok := (*resMap)[objStorageNamed.String()]; !ok {
(*resMap)[objStorageNamed.String()] = initResources()
}
(*resMap)[objStorageNamed.String()][corev1.ResourceStorage].Add(*resource.NewQuantity(size, resource.BinarySI))
//If object storage traffic bytes is smaller than 0.1 MB, no record is recorded
if bytes >= 100*1024 {
(*resMap)[objStorageNamed.String()][resources.ResourceNetwork].Add(*resource.NewQuantity(bytes, resource.BinarySI))
}
}
return nil
}

func (r *MonitorReconciler) MonitorPodTrafficUsed(startTime, endTime time.Time) error {
logger.Info("start getPodTrafficUsed", "startTime", startTime.Format(time.RFC3339), "endTime", endTime.Format(time.RFC3339))
func (r *MonitorReconciler) MonitorTrafficUsed(startTime, endTime time.Time) error {
logger.Info("start getTrafficUsed", "startTime", startTime.Format(time.RFC3339), "endTime", endTime.Format(time.RFC3339))
execTime := time.Now().UTC()
if err := r.monitorPodTrafficUsed(startTime, endTime); err != nil {
r.Logger.Error(err, "failed to monitor pod traffic used")
if r.TrafficClient != nil {
if err := r.monitorPodTrafficUsed(startTime, endTime); err != nil {
r.Logger.Error(err, "failed to monitor pod traffic used")
}
}
if r.ObjStorageClient != nil {
if err := r.monitorObjectStorageTrafficUsed(startTime, endTime); err != nil {
r.Logger.Error(err, "failed to monitor object storage traffic used")
}
}
r.Logger.Info("success to monitor pod traffic used", "startTime", startTime.Format(time.RFC3339), "endTime", endTime.Format(time.RFC3339), "execTime", time.Since(execTime).String())
return nil
}

func (r *MonitorReconciler) monitorObjectStorageTrafficUsed(startTime, endTime time.Time) error {
buckets, err := objstorage.ListAllObjectStorageBucket(r.ObjStorageClient)
if err != nil {
return fmt.Errorf("failed to list object storage buckets: %w", err)
}
r.Logger.Info("object storage buckets", "buckets len", len(buckets))
wg, _ := errgroup.WithContext(context.Background())
wg.SetLimit(10)
for i := range buckets {
bucket := buckets[i]
if !strings.Contains(bucket, "-") {
continue
}
wg.Go(func() error {
return r.handlerObjectStorageTrafficUsed(startTime, endTime, bucket)
})
}
return wg.Wait()
}

func (r *MonitorReconciler) handlerObjectStorageTrafficUsed(startTime, endTime time.Time, bucket string) error {
bytes, err := objstorage.GetObjectStorageFlow(r.PromURL, bucket, r.ObjectStorageInstance, startTime, endTime)
if err != nil {
return fmt.Errorf("failed to get object storage flow: %w", err)
}
unit := r.Properties.StringMap[resources.ResourceNetwork].Unit
used := int64(math.Ceil(float64(resource.NewQuantity(bytes, resource.BinarySI).MilliValue()) / float64(unit.MilliValue())))
if used <= 0 {
return nil
}

namespace := "ns-" + strings.SplitN(bucket, "-", 2)[0]
ro := resources.Monitor{
Category: namespace,
Name: bucket,
Used: map[uint8]int64{r.Properties.StringMap[resources.ResourceNetwork].Enum: used},
Time: endTime.Add(-1 * time.Minute),
Type: resources.AppType[resources.ObjectStorage],
}
r.Logger.Info("object storage traffic used", "monitor", ro)
err = r.DBClient.InsertMonitor(context.Background(), &ro)
if err != nil {
return fmt.Errorf("failed to insert monitor: %w", err)
}
return nil
}

func (r *MonitorReconciler) monitorPodTrafficUsed(startTime, endTime time.Time) error {
monitors, err := r.DBClient.GetDistinctMonitorCombinations(startTime, endTime)
if err != nil {
Expand Down