diff --git a/controllers/pkg/database/interface.go b/controllers/pkg/database/interface.go index ad9e4ed3e53..e8ac6450f25 100644 --- a/controllers/pkg/database/interface.go +++ b/controllers/pkg/database/interface.go @@ -52,7 +52,7 @@ type Account interface { GetBillingCount(accountType common.Type, startTime, endTime time.Time) (count, amount int64, err error) GenerateBillingData(startTime, endTime time.Time, prols *resources.PropertyTypeLS, namespaces []string, owner string) (orderID []string, amount int64, err error) InsertMonitor(ctx context.Context, monitors ...*resources.Monitor) error - GetDistinctMonitorCombinations(startTime, endTime time.Time, namespace string) ([]resources.Monitor, error) + GetDistinctMonitorCombinations(startTime, endTime time.Time) ([]resources.Monitor, error) DropMonitorCollectionsOlderThan(days int) error Disconnect(ctx context.Context) error Creator diff --git a/controllers/pkg/database/mongo/account.go b/controllers/pkg/database/mongo/account.go index 70fac7e58e0..9b17dc06526 100644 --- a/controllers/pkg/database/mongo/account.go +++ b/controllers/pkg/database/mongo/account.go @@ -22,6 +22,8 @@ import ( "strings" "time" + "github.com/labring/sealos/controllers/pkg/utils/env" + "github.com/labring/sealos/controllers/pkg/common" "github.com/labring/sealos/controllers/pkg/database" @@ -39,9 +41,15 @@ import ( metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" ) +const ( + EnvAccountDBName = "ACCOUNT_DB_NAME" + EnvTrafficDBName = "TRAFFIC_DB_NAME" + EnvTrafficConn = "TRAFFIC_CONN" +) + const ( DefaultAccountDBName = "sealos-resources" - DefaultTrafficDBName = "sealos-networkmanager-synchronizer" + DefaultTrafficDBName = "sealos-networkmanager" DefaultAuthDBName = "sealos-auth" DefaultMeteringConn = "metering" DefaultMonitorConn = "monitor" @@ -251,14 +259,13 @@ func (m *mongoDB) InsertMonitor(ctx context.Context, monitors ...*resources.Moni return err } -func (m *mongoDB) GetDistinctMonitorCombinations(startTime, endTime time.Time, namespace string) ([]resources.Monitor, error) { +func (m *mongoDB) GetDistinctMonitorCombinations(startTime, endTime time.Time) ([]resources.Monitor, error) { pipeline := mongo.Pipeline{ {{Key: "$match", Value: bson.M{ "time": bson.M{ "$gte": startTime.UTC(), "$lt": endTime.UTC(), }, - "category": namespace, }}}, {{Key: "$group", Value: bson.M{ "_id": bson.M{ @@ -267,21 +274,23 @@ func (m *mongoDB) GetDistinctMonitorCombinations(startTime, endTime time.Time, n "type": "$type", }, }}}, + {{Key: "$project", Value: bson.M{ + "_id": 0, + "category": "$_id.category", + "name": "$_id.name", + "type": "$_id.type", + }}}, } cursor, err := m.getMonitorCollection(startTime).Aggregate(context.Background(), pipeline) if err != nil { return nil, fmt.Errorf("aggregate error: %v", err) } defer cursor.Close(context.Background()) - var monitors []resources.Monitor - for cursor.Next(context.Background()) { - var result = make(map[string]resources.Monitor, 1) - if err := cursor.Decode(result); err != nil { - return nil, fmt.Errorf("decode error: %v", err) - } - monitors = append(monitors, result["_id"]) + if !cursor.Next(context.Background()) { + return nil, nil } - if err := cursor.Err(); err != nil { + var monitors []resources.Monitor + if err := cursor.All(context.Background(), &monitors); err != nil { return nil, fmt.Errorf("cursor error: %v", err) } return monitors, nil @@ -941,8 +950,8 @@ func NewMongoInterface(ctx context.Context, URL string) (database.Interface, err err = client.Ping(ctx, nil) return &mongoDB{ Client: client, - AccountDB: DefaultAccountDBName, - TrafficDB: DefaultTrafficDBName, + AccountDB: env.GetEnvWithDefault(EnvAccountDBName, DefaultAccountDBName), + TrafficDB: env.GetEnvWithDefault(EnvTrafficDBName, DefaultTrafficDBName), AuthDB: DefaultAuthDBName, UserConn: DefaultUserConn, MeteringConn: DefaultMeteringConn, @@ -950,6 +959,6 @@ func NewMongoInterface(ctx context.Context, URL string) (database.Interface, err BillingConn: DefaultBillingConn, PricesConn: DefaultPricesConn, PropertiesConn: DefaultPropertiesConn, - TrafficConn: DefaultTrafficConn, + TrafficConn: env.GetEnvWithDefault(EnvTrafficConn, DefaultTrafficConn), }, err } diff --git a/controllers/pkg/database/mongo/account_test.go b/controllers/pkg/database/mongo/account_test.go index 397b33ae089..14d31d37204 100644 --- a/controllers/pkg/database/mongo/account_test.go +++ b/controllers/pkg/database/mongo/account_test.go @@ -594,3 +594,23 @@ func TestMongoDB_SetPropertyTypeLS(t *testing.T) { // t.Fatalf("failed to save property types: %v", err) //} } + +func Test_mongoDB_GetDistinctMonitorCombinations(t *testing.T) { + dbCTX := context.Background() + + m, err := NewMongoInterface(dbCTX, os.Getenv("MONGODB_URI")) + if err != nil { + t.Errorf("failed to connect mongo: error = %v", err) + } + defer func() { + if err = m.Disconnect(dbCTX); err != nil { + t.Errorf("failed to disconnect mongo: error = %v", err) + } + }() + queryTime := time.Now().UTC() + monitorCombinations, err := m.GetDistinctMonitorCombinations(queryTime.Add(-time.Hour), queryTime) + if err != nil { + t.Fatalf("failed to get distinct monitor combinations: %v", err) + } + t.Logf("monitorCombinations: %v", monitorCombinations) +} diff --git a/controllers/resources/controllers/monitor_controller.go b/controllers/resources/controllers/monitor_controller.go index 72078b1a062..bc65bebb47f 100644 --- a/controllers/resources/controllers/monitor_controller.go +++ b/controllers/resources/controllers/monitor_controller.go @@ -24,6 +24,8 @@ import ( "sync" "time" + "golang.org/x/sync/errgroup" + "github.com/labring/sealos/controllers/pkg/utils/env" "golang.org/x/sync/semaphore" @@ -400,47 +402,54 @@ func (r *MonitorReconciler) getObjStorageUsed(user string, namedMap *map[string] } func (r *MonitorReconciler) MonitorPodTrafficUsed(startTime, endTime time.Time) error { - namespaceList, err := r.getNamespaceList() - if err != nil { - return fmt.Errorf("failed to list namespaces") - } logger.Info("start getPodTrafficUsed", "startTime", startTime.Format(time.RFC3339), "endTime", endTime.Format(time.RFC3339)) - for _, namespace := range namespaceList.Items { - if err := r.monitorPodTrafficUsed(namespace, startTime, endTime); err != nil { - r.Logger.Error(err, "failed to monitor pod traffic used", "namespace", namespace.Name) - } + execTime := time.Now().UTC() + if err := r.monitorPodTrafficUsed(startTime, endTime); err != nil { + r.Logger.Error(err, "failed to monitor pod traffic used") } + r.Logger.Info("success to monitor pod traffic used", "startTime", startTime.Format(time.RFC3339), "endTime", endTime.Format(time.RFC3339), "execTime", time.Since(execTime).String()) return nil } -func (r *MonitorReconciler) monitorPodTrafficUsed(namespace corev1.Namespace, startTime, endTime time.Time) error { - monitors, err := r.DBClient.GetDistinctMonitorCombinations(startTime, endTime, namespace.Name) +func (r *MonitorReconciler) monitorPodTrafficUsed(startTime, endTime time.Time) error { + monitors, err := r.DBClient.GetDistinctMonitorCombinations(startTime, endTime) if err != nil { return fmt.Errorf("failed to get distinct monitor combinations: %w", err) } - for _, monitor := range monitors { - bytes, err := r.TrafficClient.GetTrafficSentBytes(startTime, endTime, namespace.Name, monitor.Type, monitor.Name) - if err != nil { - return fmt.Errorf("failed to get traffic sent bytes: %w", err) - } - unit := r.Properties.StringMap[resources.ResourceNetwork].Unit - used := int64(math.Ceil(float64(resource.NewQuantity(bytes, resource.BinarySI).MilliValue()) / float64(unit.MilliValue()))) - if used == 0 { - continue - } - logger.Info("traffic used ", "monitor", monitor, "used", used, "unit", unit, "bytes", bytes) - ro := resources.Monitor{ - Category: namespace.Name, - Name: monitor.Name, - Used: map[uint8]int64{r.Properties.StringMap[resources.ResourceNetwork].Enum: used}, - Time: endTime.Add(-1 * time.Minute), - Type: monitor.Type, - } - r.Logger.Info("monitor traffic used", "monitor", ro) - err = r.DBClient.InsertMonitor(context.Background(), &ro) - if err != nil { - return fmt.Errorf("failed to insert monitor: %w", err) - } + r.Logger.Info("distinct monitor combinations", "monitors len", len(monitors)) + wg, _ := errgroup.WithContext(context.Background()) + wg.SetLimit(100) + for i := range monitors { + monitor := monitors[i] + wg.Go(func() error { + return r.handlerTrafficUsed(startTime, endTime, monitor) + }) + } + return wg.Wait() +} + +func (r *MonitorReconciler) handlerTrafficUsed(startTime, endTime time.Time, monitor resources.Monitor) error { + bytes, err := r.TrafficClient.GetTrafficSentBytes(startTime, endTime, monitor.Category, monitor.Type, monitor.Name) + if err != nil { + return fmt.Errorf("failed to get traffic sent bytes: %w", err) + } + unit := r.Properties.StringMap[resources.ResourceNetwork].Unit + used := int64(math.Ceil(float64(resource.NewQuantity(bytes, resource.BinarySI).MilliValue()) / float64(unit.MilliValue()))) + if used == 0 { + return nil + } + //logger.Info("traffic used ", "monitor", monitor, "used", used, "unit", unit, "bytes", bytes) + ro := resources.Monitor{ + Category: monitor.Category, + Name: monitor.Name, + Used: map[uint8]int64{r.Properties.StringMap[resources.ResourceNetwork].Enum: used}, + Time: endTime.Add(-1 * time.Minute), + Type: monitor.Type, + } + r.Logger.Info("monitor traffic used", "monitor", ro) + err = r.DBClient.InsertMonitor(context.Background(), &ro) + if err != nil { + return fmt.Errorf("failed to insert monitor: %w", err) } return nil }