diff --git a/controllers/pkg/database/interface.go b/controllers/pkg/database/interface.go index bc61aba5916..ad9e4ed3e53 100644 --- a/controllers/pkg/database/interface.go +++ b/controllers/pkg/database/interface.go @@ -29,6 +29,7 @@ import ( type Interface interface { Account Auth + Traffic } type Auth interface { @@ -51,11 +52,20 @@ type Account interface { GetBillingCount(accountType common.Type, startTime, endTime time.Time) (count, amount int64, err error) GenerateBillingData(startTime, endTime time.Time, prols *resources.PropertyTypeLS, namespaces []string, owner string) (orderID []string, amount int64, err error) InsertMonitor(ctx context.Context, monitors ...*resources.Monitor) error + GetDistinctMonitorCombinations(startTime, endTime time.Time, namespace string) ([]resources.Monitor, error) DropMonitorCollectionsOlderThan(days int) error Disconnect(ctx context.Context) error Creator } +type Traffic interface { + GetTrafficSentBytes(startTime, endTime time.Time, namespace string, _type uint8, name string) (int64, error) + GetTrafficRecvBytes(startTime, endTime time.Time, namespace string, _type uint8, name string) (int64, error) + + GetPodTrafficSentBytes(startTime, endTime time.Time, namespace string, name string) (int64, error) + GetPodTrafficRecvBytes(startTime, endTime time.Time, namespace string, name string) (int64, error) +} + type Creator interface { CreateBillingIfNotExist() error //suffix by day, eg: monitor_20200101 @@ -74,7 +84,8 @@ type MeteringOwnerTimeResult struct { //} const ( - MongoURI = "MONGO_URI" + MongoURI = "MONGO_URI" + TrafficMongoURI = "TRAFFIC_MONGO_URI" //MongoUsername = "MONGO_USERNAME" //MongoPassword = "MONGO_PASSWORD" //RetentionDay = "RETENTION_DAY" diff --git a/controllers/pkg/database/mongo/account.go b/controllers/pkg/database/mongo/account.go index 2355b32c61f..70fac7e58e0 100644 --- a/controllers/pkg/database/mongo/account.go +++ b/controllers/pkg/database/mongo/account.go @@ -36,12 +36,12 @@ import ( "go.mongodb.org/mongo-driver/bson/primitive" "go.mongodb.org/mongo-driver/mongo" "go.mongodb.org/mongo-driver/mongo/options" - "golang.org/x/sync/errgroup" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" ) const ( DefaultAccountDBName = "sealos-resources" + DefaultTrafficDBName = "sealos-networkmanager-synchronizer" DefaultAuthDBName = "sealos-auth" DefaultMeteringConn = "metering" DefaultMonitorConn = "monitor" @@ -49,6 +49,8 @@ const ( DefaultUserConn = "user" DefaultPricesConn = "prices" DefaultPropertiesConn = "properties" + //TODO fix + DefaultTrafficConn = "traffic" ) const DefaultRetentionDay = 30 @@ -61,6 +63,7 @@ var cryptoKey = defaultCryptoKey type mongoDB struct { Client *mongo.Client AccountDB string + TrafficDB string AuthDB string UserConn string MonitorConnPrefix string @@ -68,6 +71,7 @@ type mongoDB struct { BillingConn string PricesConn string PropertiesConn string + TrafficConn string } type AccountBalanceSpecBSON struct { @@ -247,6 +251,42 @@ func (m *mongoDB) InsertMonitor(ctx context.Context, monitors ...*resources.Moni return err } +func (m *mongoDB) GetDistinctMonitorCombinations(startTime, endTime time.Time, namespace string) ([]resources.Monitor, error) { + pipeline := mongo.Pipeline{ + {{Key: "$match", Value: bson.M{ + "time": bson.M{ + "$gte": startTime.UTC(), + "$lt": endTime.UTC(), + }, + "category": namespace, + }}}, + {{Key: "$group", Value: bson.M{ + "_id": bson.M{ + "category": "$category", + "name": "$name", + "type": "$type", + }, + }}}, + } + cursor, err := m.getMonitorCollection(startTime).Aggregate(context.Background(), pipeline) + if err != nil { + return nil, fmt.Errorf("aggregate error: %v", err) + } + defer cursor.Close(context.Background()) + var monitors []resources.Monitor + for cursor.Next(context.Background()) { + var result = make(map[string]resources.Monitor, 1) + if err := cursor.Decode(result); err != nil { + return nil, fmt.Errorf("decode error: %v", err) + } + monitors = append(monitors, result["_id"]) + } + if err := cursor.Err(); err != nil { + return nil, fmt.Errorf("cursor error: %v", err) + } + return monitors, nil +} + func (m *mongoDB) GetAllPricesMap() (map[string]resources.Price, error) { ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) defer cancel() @@ -303,90 +343,6 @@ func (m *mongoDB) SavePropertyTypes(types []resources.PropertyType) error { return err } -// 2020-12-01 23:00:00 - 2020-12-02 00:00:00 -// 2020-12-02 00:00:00 - 2020-12-02 01:00:00 -func (m *mongoDB) GenerateMeteringData(startTime, endTime time.Time, prices map[string]resources.Price) error { - filter := bson.M{ - "time": bson.M{ - "$gte": startTime, - "$lt": endTime, - }, - } - cursor, err := m.getMonitorCollection(startTime).Find(context.Background(), filter) - if err != nil { - return fmt.Errorf("find monitors error: %v", err) - } - defer cursor.Close(context.Background()) - - meteringMap := make(map[string]map[string]int64) - countMap := make(map[string]map[string]int64) - updateTimeMap := make(map[string]map[string]*time.Time) - - for cursor.Next(context.Background()) { - var monitor resources.Monitor - if err := cursor.Decode(&monitor); err != nil { - return fmt.Errorf("decode monitor error: %v", err) - } - - if _, ok := updateTimeMap[monitor.Category]; !ok { - updateTimeMap[monitor.Category] = make(map[string]*time.Time) - } - if _, ok := updateTimeMap[monitor.Category][monitor.Property]; !ok { - lastUpdateTime, err := m.GetUpdateTimeForCategoryAndPropertyFromMetering(monitor.Category, monitor.Property) - if err != nil { - logger.Debug(err, "get latest update time failed", "category", monitor.Category, "property", monitor.Property) - } - updateTimeMap[monitor.Category][monitor.Property] = &lastUpdateTime - } - lastUpdateTime := updateTimeMap[monitor.Category][monitor.Property].UTC() - - if /* skip last update lte 1 hour*/ lastUpdateTime.Before(startTime) || lastUpdateTime.Equal(startTime) { - if _, ok := meteringMap[monitor.Category]; !ok { - meteringMap[monitor.Category] = make(map[string]int64) - countMap[monitor.Category] = make(map[string]int64) - } - //TODO interface will delete - //meteringMap[monitor.Category][monitor.Property] += monitor.Value - countMap[monitor.Category][monitor.Property]++ - continue - } - logger.Debug("Info", "skip metering", "category", monitor.Category, "property", monitor.Property, "lastUpdateTime", updateTimeMap[monitor.Category][monitor.Property].UTC(), "startTime", startTime) - } - - if err := cursor.Err(); err != nil { - return fmt.Errorf("cursor error: %v", err) - } - eg, _ := errgroup.WithContext(context.Background()) - - for category, propertyMap := range meteringMap { - for property, totalValue := range propertyMap { - count := countMap[category][property] - if count < 60 { - count = 60 - } - unitValue := math.Ceil(float64(totalValue) / float64(count)) - metering := &resources.Metering{ - Category: category, - Property: property, - Time: endTime, - Amount: int64(unitValue * float64(prices[property].Price)), - Value: int64(unitValue), - //Detail: "", - } - _category, _property := category, property - eg.Go(func() error { - _, err := m.getMeteringCollection().InsertOne(context.Background(), metering) - if err != nil { - //TODO if insert failed, should todo? - logger.Error(err, "insert metering data failed", "category", _category, "property", _property) - } - return err - }) - } - } - return eg.Wait() -} - /* monitors = append(monitors, &common.Monitor{ Category: namespace.Name, @@ -445,9 +401,14 @@ func (m *mongoDB) GenerateBillingData(startTime, endTime time.Time, prols *resou }}} continue } + if value.PriceType == resources.SUM { + groupStage = append(groupStage, primitive.E{Key: keyStr, Value: bson.D{{Key: "$sum", Value: "$used." + keyStr}}}) + usedStage[keyStr] = bson.D{{Key: "$toInt", Value: "$" + keyStr}} + continue + } groupStage = append(groupStage, primitive.E{Key: keyStr, Value: bson.D{{Key: "$sum", Value: "$used." + keyStr}}}) usedStage[keyStr] = bson.D{{Key: "$toInt", Value: bson.D{{Key: "$round", Value: bson.D{{Key: "$divide", Value: bson.A{ - "$" + keyStr, bson.D{{Key: "$cond", Value: bson.A{bson.D{{Key: "$gt", Value: bson.A{"$count", minutes}}}, "$count", minutes}}}}}}}}}} + "$" + keyStr, minutes}}}}}}} } // add the used phase to the $project phase @@ -981,6 +942,7 @@ func NewMongoInterface(ctx context.Context, URL string) (database.Interface, err return &mongoDB{ Client: client, AccountDB: DefaultAccountDBName, + TrafficDB: DefaultTrafficDBName, AuthDB: DefaultAuthDBName, UserConn: DefaultUserConn, MeteringConn: DefaultMeteringConn, @@ -988,5 +950,6 @@ func NewMongoInterface(ctx context.Context, URL string) (database.Interface, err BillingConn: DefaultBillingConn, PricesConn: DefaultPricesConn, PropertiesConn: DefaultPropertiesConn, + TrafficConn: DefaultTrafficConn, }, err } diff --git a/controllers/pkg/database/mongo/traffic.go b/controllers/pkg/database/mongo/traffic.go new file mode 100644 index 00000000000..e5f304cfa95 --- /dev/null +++ b/controllers/pkg/database/mongo/traffic.go @@ -0,0 +1,131 @@ +// Copyright © 2024 sealos. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package mongo + +import ( + "context" + "time" + + "go.mongodb.org/mongo-driver/bson" + "go.mongodb.org/mongo-driver/mongo" +) + +/* example: +{ + _id: ObjectId("60eea26373c4cdcb6356827d"), + traffic_meta: { + pod_name: "my-pod", + pod_namespace: "my-namespace", + pod_address: "100.64.0.1", + traffic_tag: "port:80", + pod_type: 1, + pod_type_name: "mongodb" + }, + timestamp: "2024-01-04T04:02:25", + sent_bytes: 31457280, + recv_bytes: 15728640 + } +*/ + +func (m *mongoDB) GetTrafficRecvBytes(startTime, endTime time.Time, namespace string, _type uint8, name string) (int64, error) { + return m.getTrafficBytes(false, startTime, endTime, namespace, _type, name) +} + +func (m *mongoDB) GetTrafficSentBytes(startTime, endTime time.Time, namespace string, _type uint8, name string) (int64, error) { + return m.getTrafficBytes(true, startTime, endTime, namespace, _type, name) +} + +func (m *mongoDB) GetPodTrafficSentBytes(startTime, endTime time.Time, namespace string, name string) (int64, error) { + return m.getPodTrafficBytes(true, startTime, endTime, namespace, name) +} + +func (m *mongoDB) GetPodTrafficRecvBytes(startTime, endTime time.Time, namespace string, name string) (int64, error) { + return m.getPodTrafficBytes(false, startTime, endTime, namespace, name) +} + +func (m *mongoDB) getPodTrafficBytes(sent bool, startTime, endTime time.Time, namespace string, name string) (int64, error) { + filter := bson.M{ + "traffic_meta.pod_namespace": namespace, + "traffic_meta.pod_name": name, + "timestamp": bson.M{ + "$gte": startTime, + "$lt": endTime, + }, + } + pipeline := mongo.Pipeline{ + bson.D{{Key: "$match", Value: filter}}, + } + if sent { + pipeline = append(pipeline, bson.D{{Key: "$group", Value: bson.D{{Key: "_id", Value: nil}, {Key: "total", Value: bson.D{{Key: "$sum", Value: "$sent_bytes"}}}}}}) + } else { + pipeline = append(pipeline, bson.D{{Key: "$group", Value: bson.D{{Key: "_id", Value: nil}, {Key: "total", Value: bson.D{{Key: "$sum", Value: "$recv_bytes"}}}}}}) + } + cur, err := m.getTrafficCollection().Aggregate(context.Background(), pipeline) + if err != nil { + return 0, err + } + defer cur.Close(context.Background()) + total := int64(0) + for cur.Next(context.Background()) { + var result struct { + Total int64 `bson:"total"` + } + if err := cur.Decode(&result); err != nil { + return 0, err + } + total += result.Total + } + return total, nil +} + +func (m *mongoDB) getTrafficBytes(sent bool, startTime, endTime time.Time, namespace string, _type uint8, name string) (int64, error) { + filter := bson.M{ + "traffic_meta.pod_namespace": namespace, + "traffic_meta.pod_type": _type, + "traffic_meta.pod_type_name": name, + "timestamp": bson.M{ + "$gte": startTime, + "$lte": endTime, + }, + } + pipeline := mongo.Pipeline{ + bson.D{{Key: "$match", Value: filter}}, + } + if sent { + pipeline = append(pipeline, bson.D{{Key: "$group", Value: bson.D{{Key: "_id", Value: nil}, {Key: "total", Value: bson.D{{Key: "$sum", Value: "$sent_bytes"}}}}}}) + } else { + pipeline = append(pipeline, bson.D{{Key: "$group", Value: bson.D{{Key: "_id", Value: nil}, {Key: "total", Value: bson.D{{Key: "$sum", Value: "$recv_bytes"}}}}}}) + } + cur, err := m.getTrafficCollection().Aggregate(context.Background(), pipeline) + if err != nil { + return 0, err + } + defer cur.Close(context.Background()) + total := int64(0) + for cur.Next(context.Background()) { + var result struct { + Total int64 `bson:"total"` + } + if err := cur.Decode(&result); err != nil { + return 0, err + } + total += result.Total + } + return total, nil +} + +func (m *mongoDB) getTrafficCollection() *mongo.Collection { + return m.Client.Database(m.TrafficDB).Collection(m.TrafficConn) +} diff --git a/controllers/pkg/database/mongo/traffic_test.go b/controllers/pkg/database/mongo/traffic_test.go new file mode 100644 index 00000000000..974df19e10a --- /dev/null +++ b/controllers/pkg/database/mongo/traffic_test.go @@ -0,0 +1,46 @@ +// Copyright © 2024 sealos. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package mongo + +//import ( +// "context" +// "os" +// "testing" +// "time" +//) +// +//func Test_mongoDB_GetPodTrafficSentBytes(t *testing.T) { +// dbCTX := context.Background() +// +// m, err := NewMongoInterface(dbCTX, os.Getenv("MONGO_URL")) +// if err != nil { +// t.Errorf("failed to connect mongo: error = %v", err) +// } +// defer func() { +// if err = m.Disconnect(dbCTX); err != nil { +// t.Errorf("failed to disconnect mongo: error = %v", err) +// } +// }() +// +// //2024-01-10T06:10:24.281+00:00-2024-01-10T06:12:24.281+00:00 +// startTime, _ := time.Parse(time.RFC3339, "2024-01-10T06:10:24.281+00:00") +// endTime, _ := time.Parse(time.RFC3339, "2024-01-10T07:12:24.281+00:00") +// t.Logf("startTime = %v, endTime = %v", startTime, endTime) +// bytes, err := m.GetTrafficSentBytes(startTime, endTime, "ns-8k7qhyy3", 3, "ros-minio-qzqtpjlv") +// if err != nil { +// t.Errorf("failed to get pod traffic sent bytes: error = %v", err) +// } +// t.Logf("bytes = %v", bytes) +//} diff --git a/controllers/pkg/objectstorage/objectstorage.go b/controllers/pkg/objectstorage/objectstorage.go index 9b2ffdf8bfc..69f927df482 100644 --- a/controllers/pkg/objectstorage/objectstorage.go +++ b/controllers/pkg/objectstorage/objectstorage.go @@ -135,11 +135,11 @@ func QueryPrometheus(host, bucketName, instance string) (int64, error) { rcvdBytes, err := strconv.ParseInt(rcvdStr, 10, 64) if err != nil { - return 0, fmt.Errorf("failed to parse rcvdBytes to int64") + return 0, fmt.Errorf("failed to parse rcvdStr %s to int64: %v", rcvdStr, err) } sentBytes, err := strconv.ParseInt(sentStr, 10, 64) if err != nil { - return 0, fmt.Errorf("failed to parse rcvdBytes to int64") + return 0, fmt.Errorf("failed to parse sentStr %s to int64: %v", sentStr, err) } fmt.Printf("received bytes: %d, send bytes: %d\n", rcvdBytes, sentBytes) diff --git a/controllers/pkg/resources/resources.go b/controllers/pkg/resources/resources.go index b084ec38d16..824b7d91a37 100644 --- a/controllers/pkg/resources/resources.go +++ b/controllers/pkg/resources/resources.go @@ -258,7 +258,7 @@ var DefaultPropertyTypeList = []PropertyType{ { Name: "network", Enum: 3, - PriceType: DIF, + PriceType: SUM, UnitPrice: 0, UnitString: "1Mi", }, @@ -318,15 +318,7 @@ func decryptPrice(types []PropertyType) ([]PropertyType, error) { return types, fmt.Errorf("failed to decrypt %s unit price : %v", types[i].Name, err) } types[i].UnitPrice = price - logger.Info("decrypt unit_price: ", types[i].UnitPrice) - //if types[i].UnitPrice != 0 { - // price, err := crypto.EncryptInt64(types[i].UnitPrice) - // if err != nil { - // logger.Error("failed to encrypt unit price : %v", err) - // } else { - // types[i].EncryptUnitPrice = *price - // } - //} + logger.Info("parse properties", types[i].Enum, types[i].UnitPrice) } return types, nil } @@ -337,26 +329,6 @@ type PropertyTypeStringMap map[string]PropertyType type PropertyList []PropertyType -// | Category | Property | Time | Value (average value) | Amount (value * price) | Detail | Status | -// | ---------- | -------- | ---------- | --------------------- | ---------------------- | ------ | ------ | -// | Namespace1 | Cpu | 2023010112 | 1000 | 67000 | | 0 | -type Metering struct { - Category string `json:"category" bson:"category"` - //time 2023010112 -> 2023-01-01 11:00:00 - 2023-01-01 12:00:00 - Amount int64 `json:"amount" bson:"amount"` - // 2023010112 -> 2023-01-01 12:00:00 - Property string `json:"property" bson:"property"` - Value int64 `json:"value" bson:"value"` - Time time.Time `json:"time" bson:"time"` - Detail string `json:"detail" bson:"detail"` - // 0 -> not settled, 1 -> settled, -1 -> deleted, -2 -> refunded - //Status int `json:"status" bson:"status"` -} -type QuantityDetail struct { - *resource.Quantity - Detail string -} - // GpuResourcePrefix GPUResource = gpu- + gpu.Product ; ex. gpu-tesla-v100 const GpuResourcePrefix = "gpu-" @@ -378,59 +350,6 @@ func GetGpuResourceProduct(resource string) string { return strings.TrimPrefix(resource, GpuResourcePrefix) } -var DefaultPrices = map[string]Price{ - "cpu": { - Property: "cpu", - Price: 67, - }, - "memory": { - Property: "memory", - Price: 33, - }, - "storage": { - Property: "storage", - Price: 2, - }, -} - -// infra residual code -//const ( -// PropertyInfraCPU = "infra-cpu" -// PropertyInfraMemory = "infra-memory" -// PropertyInfraDisk = "infra-disk" -//) -//var ( -// bin1Mi = resource.NewQuantity(1<<20, resource.BinarySI) -// cpuUnit = resource.MustParse("1m") -//) -//var PricesUnit = map[corev1.ResourceName]*resource.Quantity{ -// corev1.ResourceCPU: &cpuUnit, // 1 m CPU (1000 μ) -// ResourceGPU: &cpuUnit, // 1 m CPU (1000 μ) -// corev1.ResourceMemory: bin1Mi, // 1 MiB -// corev1.ResourceStorage: bin1Mi, // 1 MiB -// ResourceNetwork: bin1Mi, // 1 MiB -//} -// -//// Core -//var infraCPUMap = map[string]int{ -// "t2.medium": 2, -// "t2.large": 2, -// "t2.xlarge": 4, -// "ecs.c7.large": 2, -// "ecs.g7.large": 2, -// "ecs.g7.xlarge": 4, -//} -// -//// GiB -//var infraMemoryMap = map[string]int{ -// "t2.medium": 4, -// "t2.large": 8, -// "t2.xlarge": 16, -// "ecs.c7.large": 4, -// "ecs.g7.large": 8, -// "ecs.g7.xlarge": 16, -//} - func GetDefaultResourceQuota(ns, name string) *corev1.ResourceQuota { return &corev1.ResourceQuota{ ObjectMeta: metav1.ObjectMeta{ @@ -479,7 +398,7 @@ func DefaultResourceQuotaHard() corev1.ResourceList { corev1.ResourceLimitsMemory: resource.MustParse(env.GetEnvWithDefault(QuotaLimitsMemory, DefaultQuotaLimitsMemory)), corev1.ResourceRequestsStorage: resource.MustParse(env.GetEnvWithDefault(QuotaLimitsStorage, DefaultQuotaLimitsStorage)), corev1.ResourceLimitsEphemeralStorage: resource.MustParse(env.GetEnvWithDefault(QuotaLimitsStorage, DefaultQuotaLimitsStorage)), - corev1.ResourceServicesNodePorts: resource.MustParse(DefaultQuotaLimitsNodePorts), + corev1.ResourceServicesNodePorts: resource.MustParse(env.GetEnvWithDefault(QuotaLimitsNodePorts, DefaultQuotaLimitsNodePorts)), //TODO storage.diskio.read, storage.diskio.write } } @@ -499,25 +418,3 @@ var LimitRangeDefault = corev1.ResourceList{ corev1.ResourceMemory: resource.MustParse("64Mi"), corev1.ResourceEphemeralStorage: resource.MustParse("100Mi"), } - -// -//// MiB -//func GetInfraCPUQuantity(flavor string, count int) *resource.Quantity { -// if v, ok := infraCPUMap[flavor]; ok { -// return resource.NewQuantity(int64(v*count), resource.DecimalSI) -// } -// return nil -//} -// -//// Gib -//func GetInfraMemoryQuantity(flavor string, count int) *resource.Quantity { -// if v, ok := infraMemoryMap[flavor]; ok { -// return resource.NewQuantity(int64((v*count)<<30), resource.BinarySI) -// } -// return nil -//} -// -//// Gib -//func GetInfraDiskQuantity(capacity int) *resource.Quantity { -// return resource.NewQuantity(int64(capacity<<30), resource.BinarySI) -//} diff --git a/controllers/resources/controllers/monitor_controller.go b/controllers/resources/controllers/monitor_controller.go index 911abf47e1c..72078b1a062 100644 --- a/controllers/resources/controllers/monitor_controller.go +++ b/controllers/resources/controllers/monitor_controller.go @@ -19,25 +19,28 @@ package controllers import ( "context" "fmt" - "io" "math" "os" - "strings" "sync" "time" + "github.com/labring/sealos/controllers/pkg/utils/env" + + "golang.org/x/sync/semaphore" + + "k8s.io/apimachinery/pkg/selection" + + "k8s.io/apimachinery/pkg/labels" + + userv1 "github.com/labring/sealos/controllers/user/api/v1" + "github.com/labring/sealos/controllers/user/controllers/helper/config" "github.com/minio/minio-go/v7" - sealos_networkmanager "github.com/dinoallo/sealos-networkmanager-protoapi" - "google.golang.org/grpc" - "google.golang.org/grpc/credentials/insecure" - objstorage "github.com/labring/sealos/controllers/pkg/objectstorage" "github.com/go-logr/logr" - "golang.org/x/sync/semaphore" "github.com/labring/sealos/controllers/pkg/database" "github.com/labring/sealos/controllers/pkg/gpu" @@ -63,7 +66,7 @@ type MonitorReconciler struct { periodicReconcile time.Duration NvidiaGpu map[string]gpu.NvidiaGPU DBClient database.Interface - TrafficSvcConn string + TrafficClient database.Interface Properties *resources.PropertyTypeLS PromURL string ObjStorageClient *minio.Client @@ -76,19 +79,17 @@ type quantity struct { } const ( - TrafficSvcConn = "TRAFFICS_SERVICE_CONNECT_ADDRESS" PrometheusURL = "PROM_URL" ObjectStorageInstance = "OBJECT_STORAGE_INSTANCE" + ConcurrentLimit = "CONCURRENT_LIMIT" ) +var concurrentLimit = int64(DefaultConcurrencyLimit) + const ( - namespaceMonitorResources = "NAMESPACE-RESOURCES" - namespaceResourcePod, namespaceResourceInfra = "pod", "infra" - MaxConcurrencyLimit = 1000 + DefaultConcurrencyLimit = 1000 ) -var namespaceMonitorFuncs = make(map[string]func(ctx context.Context, namespace *corev1.Namespace) error) - //+kubebuilder:rbac:groups=core,resources=nodes,verbs=get;list;watch //+kubebuilder:rbac:groups=core,resources=persistentvolumeclaims,verbs=get;list;watch //+kubebuilder:rbac:groups=core,resources=namespaces,verbs=get;list;watch @@ -107,10 +108,10 @@ func NewMonitorReconciler(mgr ctrl.Manager) (*MonitorReconciler, error) { Logger: ctrl.Log.WithName("controllers").WithName("Monitor"), stopCh: make(chan struct{}), periodicReconcile: 1 * time.Minute, - TrafficSvcConn: os.Getenv(TrafficSvcConn), PromURL: os.Getenv(PrometheusURL), ObjectStorageInstance: os.Getenv(ObjectStorageInstance), } + concurrentLimit = env.GetInt64EnvWithDefault(ConcurrentLimit, DefaultConcurrencyLimit) var err error err = retry.Retry(2, 1*time.Second, func() error { r.NvidiaGpu, err = gpu.GetNodeGpuModel(mgr.GetClient()) @@ -123,28 +124,14 @@ func NewMonitorReconciler(mgr ctrl.Manager) (*MonitorReconciler, error) { return nil, err } r.Logger.Info("get gpu model", "gpu model", r.NvidiaGpu) - r.startPeriodicReconcile() return r, nil } -func (r *MonitorReconciler) initNamespaceFuncs() { - res := os.Getenv(namespaceMonitorResources) - if res == "" { - res = namespaceResourcePod - } - //utils.GetEnvWithDefault(namespaceMonitorResources, namespaceResourcePod), "," - namespaceResourceList := strings.Split(res, ",") - for _, res := range namespaceResourceList { - switch res { - case namespaceResourcePod: - namespaceMonitorFuncs[namespaceResourcePod] = r.podResourceUsageInsert - case namespaceResourceInfra: - //namespaceMonitorFuncs[namespaceResourceInfra] = r.infraResourceUsage - } - } -} - func (r *MonitorReconciler) StartReconciler(ctx context.Context) error { + r.startPeriodicReconcile() + if r.TrafficClient != nil { + r.startMonitorTraffic() + } <-ctx.Done() r.stopPeriodicReconcile() return nil @@ -154,11 +141,7 @@ func (r *MonitorReconciler) startPeriodicReconcile() { r.wg.Add(1) go func() { defer r.wg.Done() - waitTime := time.Until(time.Now().Truncate(time.Minute).Add(1 * time.Minute)) - if waitTime > 0 { - logger.Info("wait for first reconcile", "waitTime", waitTime) - time.Sleep(waitTime) - } + waitNextMinute() ticker := time.NewTicker(r.periodicReconcile) for { select { @@ -172,44 +155,87 @@ func (r *MonitorReconciler) startPeriodicReconcile() { }() } +func (r *MonitorReconciler) getNamespaceList() (*corev1.NamespaceList, error) { + namespaceList := &corev1.NamespaceList{} + req, err := labels.NewRequirement(userv1.UserLabelOwnerKey, selection.Exists, nil) + if err != nil { + return nil, fmt.Errorf("failed to create label requirement: %v", err) + } + return namespaceList, r.List(context.Background(), namespaceList, &client.ListOptions{ + LabelSelector: labels.NewSelector().Add(*req), + }) +} + +func waitNextMinute() { + waitTime := time.Until(time.Now().Truncate(time.Minute).Add(1 * time.Minute)) + if waitTime > 0 { + logger.Info("wait for first reconcile", "waitTime", waitTime) + time.Sleep(waitTime) + } +} + +func waitNextHour() { + waitTime := time.Until(time.Now().Truncate(time.Hour).Add(1 * time.Hour)) + if waitTime > 0 { + logger.Info("wait for first reconcile", "waitTime", waitTime) + time.Sleep(waitTime) + } +} + +func (r *MonitorReconciler) startMonitorTraffic() { + r.wg.Add(1) + go func() { + defer r.wg.Done() + startTime, endTime := time.Now().UTC(), time.Now().Truncate(time.Hour).Add(1*time.Hour).UTC() + waitNextHour() + ticker := time.NewTicker(1 * time.Hour) + if err := r.MonitorPodTrafficUsed(startTime, endTime); err != nil { + r.Logger.Error(err, "failed to monitor pod traffic used") + } + for { + select { + case <-ticker.C: + startTime, endTime = endTime, endTime.Add(1*time.Hour) + if err := r.MonitorPodTrafficUsed(startTime, endTime); err != nil { + r.Logger.Error(err, "failed to monitor pod traffic used") + break + } + case <-r.stopCh: + ticker.Stop() + return + } + } + }() +} + func (r *MonitorReconciler) stopPeriodicReconcile() { close(r.stopCh) r.wg.Wait() } func (r *MonitorReconciler) enqueueNamespacesForReconcile() { - ctx := context.Background() r.Logger.Info("enqueue namespaces for reconcile", "time", time.Now().Format(time.RFC3339)) - namespaceList := &corev1.NamespaceList{} - if err := r.Client.List(ctx, namespaceList); err != nil { + namespaceList, err := r.getNamespaceList() + if err != nil { r.Logger.Error(err, "failed to list namespaces") return } - if err := r.processNamespaceList(ctx, namespaceList); err != nil { + if err := r.processNamespaceList(namespaceList); err != nil { r.Logger.Error(err, "failed to process namespace", "time", time.Now().Format(time.RFC3339)) } - //r.namespaceListQueue <- namespaceList - - //for i := range namespaceList.Items { - // r.namespaceQueue <- &namespaceList.Items[i] - //} } -func (r *MonitorReconciler) processNamespaceList(ctx context.Context, namespaceList *corev1.NamespaceList) error { +func (r *MonitorReconciler) processNamespaceList(namespaceList *corev1.NamespaceList) error { logger.Info("start processNamespaceList", "namespaceList len", len(namespaceList.Items), "time", time.Now().Format(time.RFC3339)) if len(namespaceList.Items) == 0 { r.Logger.Error(fmt.Errorf("no namespace to process"), "") return nil } + sem := semaphore.NewWeighted(concurrentLimit) wg := sync.WaitGroup{} wg.Add(len(namespaceList.Items)) - concurrencyLimit := len(namespaceList.Items) - if concurrencyLimit > MaxConcurrencyLimit { - concurrencyLimit = MaxConcurrencyLimit - } - sem := semaphore.NewWeighted(int64(concurrencyLimit)) for i := range namespaceList.Items { go func(namespace *corev1.Namespace) { defer wg.Done() @@ -218,8 +244,8 @@ func (r *MonitorReconciler) processNamespaceList(ctx context.Context, namespaceL return } defer sem.Release(1) - if err := r.processNamespace(ctx, namespace); err != nil { - r.Logger.Error(err, "failed to process namespace", "namespace", namespace.Name) + if err := r.monitorResourceUsage(namespace); err != nil { + r.Logger.Error(err, "monitor pod resource", "namespace", namespace.Name) } }(&namespaceList.Items[i]) } @@ -228,36 +254,13 @@ func (r *MonitorReconciler) processNamespaceList(ctx context.Context, namespaceL return nil } -func (r *MonitorReconciler) processNamespace(ctx context.Context, namespace *corev1.Namespace) error { - //for res := range namespaceMonitorFuncs { - // if err := namespaceMonitorFuncs[res](ctx, dbClient, namespace); err != nil { - // r.Logger.Error(err, "monitor namespace resource", "resource", res, "namespace", namespace.Name) - // return err - // } - //} - if err := r.podResourceUsageInsert(ctx, namespace); err != nil { - r.Logger.Error(err, "monitor pod resource", "namespace", namespace.Name) - return err - } - - return nil -} - -func (r *MonitorReconciler) podResourceUsageInsert(ctx context.Context, namespace *corev1.Namespace) error { - monitors, err := r.getResourceUsage(namespace) - if err != nil { - return fmt.Errorf("failed to get resource usage: %v", err) - } - return r.DBClient.InsertMonitor(ctx, monitors...) -} - -func (r *MonitorReconciler) getResourceUsage(namespace *corev1.Namespace) ([]*resources.Monitor, error) { +func (r *MonitorReconciler) monitorResourceUsage(namespace *corev1.Namespace) error { timeStamp := time.Now().UTC() podList := corev1.PodList{} resUsed := map[string]map[corev1.ResourceName]*quantity{} resNamed := make(map[string]*resources.ResourceNamed) if err := r.List(context.Background(), &podList, &client.ListOptions{Namespace: namespace.Name}); err != nil { - return nil, err + return err } for _, pod := range podList.Items { if pod.Spec.NodeName == "" || (pod.Status.Phase == corev1.PodSucceeded && time.Since(pod.Status.StartTime.Time) > 1*time.Minute) { @@ -298,7 +301,7 @@ func (r *MonitorReconciler) getResourceUsage(namespace *corev1.Namespace) ([]*re pvcList := corev1.PersistentVolumeClaimList{} if err := r.List(context.Background(), &pvcList, &client.ListOptions{Namespace: namespace.Name}); err != nil { - return nil, fmt.Errorf("failed to list pvc: %v", err) + return fmt.Errorf("failed to list pvc: %v", err) } for _, pvc := range pvcList.Items { if pvc.Status.Phase != corev1.ClaimBound || pvc.Name == resources.KubeBlocksBackUpName { @@ -313,7 +316,7 @@ func (r *MonitorReconciler) getResourceUsage(namespace *corev1.Namespace) ([]*re } svcList := corev1.ServiceList{} if err := r.List(context.Background(), &svcList, &client.ListOptions{Namespace: namespace.Name}); err != nil { - return nil, fmt.Errorf("failed to list svc: %v", err) + return fmt.Errorf("failed to list svc: %v", err) } for _, svc := range svcList.Items { if svc.Spec.Type != corev1.ServiceTypeNodePort { @@ -330,34 +333,13 @@ func (r *MonitorReconciler) getResourceUsage(namespace *corev1.Namespace) ([]*re var monitors []*resources.Monitor - getResourceUsed := func(podResource map[corev1.ResourceName]*quantity) (bool, map[uint8]int64) { - used := map[uint8]int64{} - isEmpty := true - for i := range podResource { - if podResource[i].MilliValue() == 0 { - continue - } - isEmpty = false - if pType, ok := r.Properties.StringMap[i.String()]; ok { - used[pType.Enum] = int64(math.Ceil(float64(podResource[i].MilliValue()) / float64(pType.Unit.MilliValue()))) - continue - } - r.Logger.Error(fmt.Errorf("not found resource type"), "resource", i.String()) - } - return isEmpty, used - } - if r.TrafficSvcConn != "" { - if err := r.getPodTrafficUsed(namespace.Name, &resNamed, &resUsed); err != nil { - r.Logger.Error(err, "failed to get pod traffic used", "namespace", namespace) - } - } if username := config.GetUserNameByNamespace(namespace.Name); r.ObjStorageClient != nil { if err := r.getObjStorageUsed(username, &resNamed, &resUsed); err != nil { r.Logger.Error(err, "failed to get object storage used", "username", username) } } for name, podResource := range resUsed { - isEmpty, used := getResourceUsed(podResource) + isEmpty, used := r.getResourceUsed(podResource) if isEmpty { continue } @@ -369,7 +351,24 @@ func (r *MonitorReconciler) getResourceUsage(namespace *corev1.Namespace) ([]*re Name: resNamed[name].Name(), }) } - return monitors, nil + return r.DBClient.InsertMonitor(context.Background(), monitors...) +} + +func (r *MonitorReconciler) getResourceUsed(podResource map[corev1.ResourceName]*quantity) (bool, map[uint8]int64) { + used := map[uint8]int64{} + isEmpty := true + for i := range podResource { + if podResource[i].MilliValue() == 0 { + continue + } + isEmpty = false + if pType, ok := r.Properties.StringMap[i.String()]; ok { + used[pType.Enum] = int64(math.Ceil(float64(podResource[i].MilliValue()) / float64(pType.Unit.MilliValue()))) + continue + } + r.Logger.Error(fmt.Errorf("not found resource type"), "resource", i.String()) + } + return isEmpty, used } func (r *MonitorReconciler) getObjStorageUsed(user string, namedMap *map[string]*resources.ResourceNamed, resMap *map[string]map[corev1.ResourceName]*quantity) error { @@ -400,67 +399,48 @@ func (r *MonitorReconciler) getObjStorageUsed(user string, namedMap *map[string] return nil } -//func (r *MonitorReconciler) getObjStorageUsed(user string, namedMap *map[string]*resources.ResourceNamed, resMap *map[string]map[corev1.ResourceName]*quantity) error { -// size, count, err := objstorage.GetUserObjectStorageSize(r.ObjStorageClient, user) -// if err != nil { -// return fmt.Errorf("failed to get object storage user storage size: %w", err) -// } -// if count == 0 || size == 0 { -// return nil -// } -// bytes, err := objstorage.GetUserObjectStorageFlow(r.ObjStorageClient, r.PromURL, user) -// if err != nil { -// return fmt.Errorf("failed to get object storage user storage flow: %w", err) -// } -// objStorageNamed := resources.NewObjStorageResourceNamed() -// (*namedMap)[objStorageNamed.Name()] = objStorageNamed -// if _, ok := (*resMap)[objStorageNamed.Name()]; !ok { -// (*resMap)[objStorageNamed.Name()] = initResources() -// } -// (*resMap)[objStorageNamed.Name()][corev1.ResourceStorage].Add(*resource.NewQuantity(size, resource.BinarySI)) -// (*resMap)[objStorageNamed.Name()][resources.ResourceNetwork].Add(*resource.NewQuantity(bytes, resource.BinarySI)) -// return nil -//} - -func (r *MonitorReconciler) getPodTrafficUsed(namespace string, namedMap *map[string]*resources.ResourceNamed, podsRes *map[string]map[corev1.ResourceName]*quantity) error { - conn, err := grpc.Dial(r.TrafficSvcConn, grpc.WithTransportCredentials(insecure.NewCredentials())) +func (r *MonitorReconciler) MonitorPodTrafficUsed(startTime, endTime time.Time) error { + namespaceList, err := r.getNamespaceList() if err != nil { - return fmt.Errorf("dial grpc failed: %w", err) + return fmt.Errorf("failed to list namespaces") } - defer conn.Close() - - infoSvc := sealos_networkmanager.NewInfoServiceClient(conn) - tType := sealos_networkmanager.TrafficType_IPv4Egress - var labelsNotIn []*sealos_networkmanager.Label - //logger.Info("namespace", namespace) - for _, named := range *namedMap { - tfs := sealos_networkmanager.TrafficStatRequest{ - Namespace: namespace, - Type: &tType, - LabelsIn: named.GetInLabels(), - LabelsNotIn: labelsNotIn, - LabelsNotExists: named.GetNotExistLabels(), + logger.Info("start getPodTrafficUsed", "startTime", startTime.Format(time.RFC3339), "endTime", endTime.Format(time.RFC3339)) + for _, namespace := range namespaceList.Items { + if err := r.monitorPodTrafficUsed(namespace, startTime, endTime); err != nil { + r.Logger.Error(err, "failed to monitor pod traffic used", "namespace", namespace.Name) } - cli, err := infoSvc.GetTrafficStat(context.Background(), &tfs) + } + return nil +} + +func (r *MonitorReconciler) monitorPodTrafficUsed(namespace corev1.Namespace, startTime, endTime time.Time) error { + monitors, err := r.DBClient.GetDistinctMonitorCombinations(startTime, endTime, namespace.Name) + if err != nil { + return fmt.Errorf("failed to get distinct monitor combinations: %w", err) + } + for _, monitor := range monitors { + bytes, err := r.TrafficClient.GetTrafficSentBytes(startTime, endTime, namespace.Name, monitor.Type, monitor.Name) if err != nil { - return fmt.Errorf("get traffic stat failed: %w", err) + return fmt.Errorf("failed to get traffic sent bytes: %w", err) } - //logger.Info("traffic stat", "named", named.String(), " labelsIn", named.GetInLabels(), " labelsNotIn", labelsNotIn, " labelsNotExists", named.GetNotExistLabels()) - for { - rsp, err := cli.Recv() - if err != nil { - if err == io.EOF { - break - } - return fmt.Errorf("recv traffic stat failed: %w", err) - } - if rsp.Bytes == nil { - continue - } - (*podsRes)[named.String()][resources.ResourceNetwork].Add(*resource.NewQuantity(int64(*rsp.Bytes), resource.BinarySI)) - //r.Logger.Info("traffic rsp", "rsp.pod", *rsp.Pod, "rsp.bytes", *rsp.Bytes, "rsp.identity", rsp.Identity) + unit := r.Properties.StringMap[resources.ResourceNetwork].Unit + used := int64(math.Ceil(float64(resource.NewQuantity(bytes, resource.BinarySI).MilliValue()) / float64(unit.MilliValue()))) + if used == 0 { + continue + } + logger.Info("traffic used ", "monitor", monitor, "used", used, "unit", unit, "bytes", bytes) + ro := resources.Monitor{ + Category: namespace.Name, + Name: monitor.Name, + Used: map[uint8]int64{r.Properties.StringMap[resources.ResourceNetwork].Enum: used}, + Time: endTime.Add(-1 * time.Minute), + Type: monitor.Type, + } + r.Logger.Info("monitor traffic used", "monitor", ro) + err = r.DBClient.InsertMonitor(context.Background(), &ro) + if err != nil { + return fmt.Errorf("failed to insert monitor: %w", err) } - labelsNotIn = append(labelsNotIn, named.GetInLabels()...) } return nil } diff --git a/controllers/resources/controllers/monitor_controller_test.go b/controllers/resources/controllers/monitor_controller_test.go index f7fb3e07efb..59ca1fac3c0 100644 --- a/controllers/resources/controllers/monitor_controller_test.go +++ b/controllers/resources/controllers/monitor_controller_test.go @@ -13,44 +13,3 @@ // limitations under the License. package controllers - -import ( - "context" - "fmt" - "testing" - - corev1 "k8s.io/api/core/v1" - "k8s.io/client-go/kubernetes/scheme" - "sigs.k8s.io/controller-runtime/pkg/client" - - controllerruntime "sigs.k8s.io/controller-runtime" -) - -func BenchmarkNewMonitorReconciler(b *testing.B) { - // 1. 初始化 MonitorReconciler 和 Kubernetes client - m := &MonitorReconciler{} - cfg, err := controllerruntime.GetConfig() - if err != nil { - b.Fatal(fmt.Errorf("failed to get kubernetes config: %v", err)) - } - m.Client, err = client.New(cfg, client.Options{Scheme: scheme.Scheme}) - if err != nil { - b.Fatal(fmt.Errorf("failed to create kubernetes client: %v", err)) - } - m.initNamespaceFuncs() - - // 2. 列出所有命名空间 - ctx := context.Background() - namespaceList := &corev1.NamespaceList{} - if err := m.Client.List(ctx, namespaceList); err != nil { - b.Fatal(err, "failed to list namespaces") - } - - // 3. 基准测试 - b.ResetTimer() // 重置计时器 - for i := 0; i < 1; i++ { - if err := m.processNamespaceList(ctx, namespaceList); err != nil { - b.Fatal(err, "failed to process namespace list") - } - } -} diff --git a/controllers/resources/deploy/Kubefile b/controllers/resources/deploy/Kubefile index c451bfbf8f2..ac7a04474ef 100644 --- a/controllers/resources/deploy/Kubefile +++ b/controllers/resources/deploy/Kubefile @@ -7,7 +7,7 @@ COPY manifests manifests ENV DEFAULT_NAMESPACE resources-system ENV MONGO_URI "mongodb://mongo:27017/resources" -ENV TRAFFICS_SERVICE_CONNECT_ADDRESS "sealos-networkmanager-info-service.sealos-networkmanager-system:50051" +ENV TRAFFIC_MONGO_URI "mongodb://mongo:27017/traffic" CMD ["kubectl apply -f manifests/deploy.yaml -f manifests/deploy-manager.yaml -n $DEFAULT_NAMESPACE && ( kubectl create -f manifests/mongo-secret.yaml -n $DEFAULT_NAMESPACE || true )"] diff --git a/controllers/resources/deploy/manifests/deploy-manager.yaml b/controllers/resources/deploy/manifests/deploy-manager.yaml index 934ba4e3276..818a96b1529 100644 --- a/controllers/resources/deploy/manifests/deploy-manager.yaml +++ b/controllers/resources/deploy/manifests/deploy-manager.yaml @@ -69,12 +69,6 @@ spec: secretKeyRef: key: MONGO_URI name: mongo-secret - - name: TRAFFICS_SERVICE_CONNECT_ADDRESS - valueFrom: - secretKeyRef: - key: TRAFFICS_SERVICE_CONNECT_ADDRESS - name: mongo-secret - optional: true image: ghcr.io/labring/sealos-resources-controller:latest imagePullPolicy: Always livenessProbe: diff --git a/controllers/resources/main.go b/controllers/resources/main.go index a70b73a5cb4..0bd1829ee4a 100644 --- a/controllers/resources/main.go +++ b/controllers/resources/main.go @@ -123,6 +123,21 @@ func main() { setupLog.Error(err, "failed to disconnect db client") } }() + if trafficURI := os.Getenv(database.TrafficMongoURI); trafficURI != "" { + reconciler.TrafficClient, err = mongo.NewMongoInterface(context.Background(), trafficURI) + if err != nil { + setupLog.Error(err, "failed to init traffic db client") + os.Exit(1) + } + defer func() { + if err := reconciler.TrafficClient.Disconnect(context.Background()); err != nil { + setupLog.Error(err, "failed to disconnect traffic db client") + } + }() + } else { + setupLog.Info("traffic mongo uri not found, please check env: TRAFFIC_MONGO_URI") + } + err = reconciler.DBClient.InitDefaultPropertyTypeLS() if err != nil { setupLog.Error(err, "failed to get property type")