Skip to content

Commit

Permalink
Optimize/monitor (#4481)
Browse files Browse the repository at this point in the history
* add Traffic interface

* add sum traffic used

* remove useless code
  • Loading branch information
bxy4543 committed Jan 12, 2024
1 parent 26e06ad commit e819739
Show file tree
Hide file tree
Showing 11 changed files with 399 additions and 403 deletions.
13 changes: 12 additions & 1 deletion controllers/pkg/database/interface.go
Expand Up @@ -29,6 +29,7 @@ import (
type Interface interface {
Account
Auth
Traffic
}

type Auth interface {
Expand All @@ -51,11 +52,20 @@ type Account interface {
GetBillingCount(accountType common.Type, startTime, endTime time.Time) (count, amount int64, err error)
GenerateBillingData(startTime, endTime time.Time, prols *resources.PropertyTypeLS, namespaces []string, owner string) (orderID []string, amount int64, err error)
InsertMonitor(ctx context.Context, monitors ...*resources.Monitor) error
GetDistinctMonitorCombinations(startTime, endTime time.Time, namespace string) ([]resources.Monitor, error)
DropMonitorCollectionsOlderThan(days int) error
Disconnect(ctx context.Context) error
Creator
}

type Traffic interface {
GetTrafficSentBytes(startTime, endTime time.Time, namespace string, _type uint8, name string) (int64, error)
GetTrafficRecvBytes(startTime, endTime time.Time, namespace string, _type uint8, name string) (int64, error)

GetPodTrafficSentBytes(startTime, endTime time.Time, namespace string, name string) (int64, error)
GetPodTrafficRecvBytes(startTime, endTime time.Time, namespace string, name string) (int64, error)
}

type Creator interface {
CreateBillingIfNotExist() error
//suffix by day, eg: monitor_20200101
Expand All @@ -74,7 +84,8 @@ type MeteringOwnerTimeResult struct {
//}

const (
MongoURI = "MONGO_URI"
MongoURI = "MONGO_URI"
TrafficMongoURI = "TRAFFIC_MONGO_URI"
//MongoUsername = "MONGO_USERNAME"
//MongoPassword = "MONGO_PASSWORD"
//RetentionDay = "RETENTION_DAY"
Expand Down
135 changes: 49 additions & 86 deletions controllers/pkg/database/mongo/account.go
Expand Up @@ -36,19 +36,21 @@ import (
"go.mongodb.org/mongo-driver/bson/primitive"
"go.mongodb.org/mongo-driver/mongo"
"go.mongodb.org/mongo-driver/mongo/options"
"golang.org/x/sync/errgroup"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
)

const (
DefaultAccountDBName = "sealos-resources"
DefaultTrafficDBName = "sealos-networkmanager-synchronizer"
DefaultAuthDBName = "sealos-auth"
DefaultMeteringConn = "metering"
DefaultMonitorConn = "monitor"
DefaultBillingConn = "billing"
DefaultUserConn = "user"
DefaultPricesConn = "prices"
DefaultPropertiesConn = "properties"
//TODO fix
DefaultTrafficConn = "traffic"
)

const DefaultRetentionDay = 30
Expand All @@ -61,13 +63,15 @@ var cryptoKey = defaultCryptoKey
type mongoDB struct {
Client *mongo.Client
AccountDB string
TrafficDB string
AuthDB string
UserConn string
MonitorConnPrefix string
MeteringConn string
BillingConn string
PricesConn string
PropertiesConn string
TrafficConn string
}

type AccountBalanceSpecBSON struct {
Expand Down Expand Up @@ -247,6 +251,42 @@ func (m *mongoDB) InsertMonitor(ctx context.Context, monitors ...*resources.Moni
return err
}

func (m *mongoDB) GetDistinctMonitorCombinations(startTime, endTime time.Time, namespace string) ([]resources.Monitor, error) {
pipeline := mongo.Pipeline{
{{Key: "$match", Value: bson.M{
"time": bson.M{
"$gte": startTime.UTC(),
"$lt": endTime.UTC(),
},
"category": namespace,
}}},
{{Key: "$group", Value: bson.M{
"_id": bson.M{
"category": "$category",
"name": "$name",
"type": "$type",
},
}}},
}
cursor, err := m.getMonitorCollection(startTime).Aggregate(context.Background(), pipeline)
if err != nil {
return nil, fmt.Errorf("aggregate error: %v", err)
}
defer cursor.Close(context.Background())
var monitors []resources.Monitor
for cursor.Next(context.Background()) {
var result = make(map[string]resources.Monitor, 1)
if err := cursor.Decode(result); err != nil {
return nil, fmt.Errorf("decode error: %v", err)
}
monitors = append(monitors, result["_id"])
}
if err := cursor.Err(); err != nil {
return nil, fmt.Errorf("cursor error: %v", err)
}
return monitors, nil
}

func (m *mongoDB) GetAllPricesMap() (map[string]resources.Price, error) {
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()
Expand Down Expand Up @@ -303,90 +343,6 @@ func (m *mongoDB) SavePropertyTypes(types []resources.PropertyType) error {
return err
}

// 2020-12-01 23:00:00 - 2020-12-02 00:00:00
// 2020-12-02 00:00:00 - 2020-12-02 01:00:00
func (m *mongoDB) GenerateMeteringData(startTime, endTime time.Time, prices map[string]resources.Price) error {
filter := bson.M{
"time": bson.M{
"$gte": startTime,
"$lt": endTime,
},
}
cursor, err := m.getMonitorCollection(startTime).Find(context.Background(), filter)
if err != nil {
return fmt.Errorf("find monitors error: %v", err)
}
defer cursor.Close(context.Background())

meteringMap := make(map[string]map[string]int64)
countMap := make(map[string]map[string]int64)
updateTimeMap := make(map[string]map[string]*time.Time)

for cursor.Next(context.Background()) {
var monitor resources.Monitor
if err := cursor.Decode(&monitor); err != nil {
return fmt.Errorf("decode monitor error: %v", err)
}

if _, ok := updateTimeMap[monitor.Category]; !ok {
updateTimeMap[monitor.Category] = make(map[string]*time.Time)
}
if _, ok := updateTimeMap[monitor.Category][monitor.Property]; !ok {
lastUpdateTime, err := m.GetUpdateTimeForCategoryAndPropertyFromMetering(monitor.Category, monitor.Property)
if err != nil {
logger.Debug(err, "get latest update time failed", "category", monitor.Category, "property", monitor.Property)
}
updateTimeMap[monitor.Category][monitor.Property] = &lastUpdateTime
}
lastUpdateTime := updateTimeMap[monitor.Category][monitor.Property].UTC()

if /* skip last update lte 1 hour*/ lastUpdateTime.Before(startTime) || lastUpdateTime.Equal(startTime) {
if _, ok := meteringMap[monitor.Category]; !ok {
meteringMap[monitor.Category] = make(map[string]int64)
countMap[monitor.Category] = make(map[string]int64)
}
//TODO interface will delete
//meteringMap[monitor.Category][monitor.Property] += monitor.Value
countMap[monitor.Category][monitor.Property]++
continue
}
logger.Debug("Info", "skip metering", "category", monitor.Category, "property", monitor.Property, "lastUpdateTime", updateTimeMap[monitor.Category][monitor.Property].UTC(), "startTime", startTime)
}

if err := cursor.Err(); err != nil {
return fmt.Errorf("cursor error: %v", err)
}
eg, _ := errgroup.WithContext(context.Background())

for category, propertyMap := range meteringMap {
for property, totalValue := range propertyMap {
count := countMap[category][property]
if count < 60 {
count = 60
}
unitValue := math.Ceil(float64(totalValue) / float64(count))
metering := &resources.Metering{
Category: category,
Property: property,
Time: endTime,
Amount: int64(unitValue * float64(prices[property].Price)),
Value: int64(unitValue),
//Detail: "",
}
_category, _property := category, property
eg.Go(func() error {
_, err := m.getMeteringCollection().InsertOne(context.Background(), metering)
if err != nil {
//TODO if insert failed, should todo?
logger.Error(err, "insert metering data failed", "category", _category, "property", _property)
}
return err
})
}
}
return eg.Wait()
}

/*
monitors = append(monitors, &common.Monitor{
Category: namespace.Name,
Expand Down Expand Up @@ -445,9 +401,14 @@ func (m *mongoDB) GenerateBillingData(startTime, endTime time.Time, prols *resou
}}}
continue
}
if value.PriceType == resources.SUM {
groupStage = append(groupStage, primitive.E{Key: keyStr, Value: bson.D{{Key: "$sum", Value: "$used." + keyStr}}})
usedStage[keyStr] = bson.D{{Key: "$toInt", Value: "$" + keyStr}}
continue
}
groupStage = append(groupStage, primitive.E{Key: keyStr, Value: bson.D{{Key: "$sum", Value: "$used." + keyStr}}})
usedStage[keyStr] = bson.D{{Key: "$toInt", Value: bson.D{{Key: "$round", Value: bson.D{{Key: "$divide", Value: bson.A{
"$" + keyStr, bson.D{{Key: "$cond", Value: bson.A{bson.D{{Key: "$gt", Value: bson.A{"$count", minutes}}}, "$count", minutes}}}}}}}}}}
"$" + keyStr, minutes}}}}}}}
}

// add the used phase to the $project phase
Expand Down Expand Up @@ -981,12 +942,14 @@ func NewMongoInterface(ctx context.Context, URL string) (database.Interface, err
return &mongoDB{
Client: client,
AccountDB: DefaultAccountDBName,
TrafficDB: DefaultTrafficDBName,
AuthDB: DefaultAuthDBName,
UserConn: DefaultUserConn,
MeteringConn: DefaultMeteringConn,
MonitorConnPrefix: DefaultMonitorConn,
BillingConn: DefaultBillingConn,
PricesConn: DefaultPricesConn,
PropertiesConn: DefaultPropertiesConn,
TrafficConn: DefaultTrafficConn,
}, err
}
131 changes: 131 additions & 0 deletions controllers/pkg/database/mongo/traffic.go
@@ -0,0 +1,131 @@
// Copyright © 2024 sealos.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package mongo

import (
"context"
"time"

"go.mongodb.org/mongo-driver/bson"
"go.mongodb.org/mongo-driver/mongo"
)

/* example:
{
_id: ObjectId("60eea26373c4cdcb6356827d"),
traffic_meta: {
pod_name: "my-pod",
pod_namespace: "my-namespace",
pod_address: "100.64.0.1",
traffic_tag: "port:80",
pod_type: 1,
pod_type_name: "mongodb"
},
timestamp: "2024-01-04T04:02:25",
sent_bytes: 31457280,
recv_bytes: 15728640
}
*/

func (m *mongoDB) GetTrafficRecvBytes(startTime, endTime time.Time, namespace string, _type uint8, name string) (int64, error) {
return m.getTrafficBytes(false, startTime, endTime, namespace, _type, name)
}

func (m *mongoDB) GetTrafficSentBytes(startTime, endTime time.Time, namespace string, _type uint8, name string) (int64, error) {
return m.getTrafficBytes(true, startTime, endTime, namespace, _type, name)
}

func (m *mongoDB) GetPodTrafficSentBytes(startTime, endTime time.Time, namespace string, name string) (int64, error) {
return m.getPodTrafficBytes(true, startTime, endTime, namespace, name)
}

func (m *mongoDB) GetPodTrafficRecvBytes(startTime, endTime time.Time, namespace string, name string) (int64, error) {
return m.getPodTrafficBytes(false, startTime, endTime, namespace, name)
}

func (m *mongoDB) getPodTrafficBytes(sent bool, startTime, endTime time.Time, namespace string, name string) (int64, error) {
filter := bson.M{
"traffic_meta.pod_namespace": namespace,
"traffic_meta.pod_name": name,
"timestamp": bson.M{
"$gte": startTime,
"$lt": endTime,
},
}
pipeline := mongo.Pipeline{
bson.D{{Key: "$match", Value: filter}},
}
if sent {
pipeline = append(pipeline, bson.D{{Key: "$group", Value: bson.D{{Key: "_id", Value: nil}, {Key: "total", Value: bson.D{{Key: "$sum", Value: "$sent_bytes"}}}}}})
} else {
pipeline = append(pipeline, bson.D{{Key: "$group", Value: bson.D{{Key: "_id", Value: nil}, {Key: "total", Value: bson.D{{Key: "$sum", Value: "$recv_bytes"}}}}}})
}
cur, err := m.getTrafficCollection().Aggregate(context.Background(), pipeline)
if err != nil {
return 0, err
}
defer cur.Close(context.Background())
total := int64(0)
for cur.Next(context.Background()) {
var result struct {
Total int64 `bson:"total"`
}
if err := cur.Decode(&result); err != nil {
return 0, err
}
total += result.Total
}
return total, nil
}

func (m *mongoDB) getTrafficBytes(sent bool, startTime, endTime time.Time, namespace string, _type uint8, name string) (int64, error) {
filter := bson.M{
"traffic_meta.pod_namespace": namespace,
"traffic_meta.pod_type": _type,
"traffic_meta.pod_type_name": name,
"timestamp": bson.M{
"$gte": startTime,
"$lte": endTime,
},
}
pipeline := mongo.Pipeline{
bson.D{{Key: "$match", Value: filter}},
}
if sent {
pipeline = append(pipeline, bson.D{{Key: "$group", Value: bson.D{{Key: "_id", Value: nil}, {Key: "total", Value: bson.D{{Key: "$sum", Value: "$sent_bytes"}}}}}})
} else {
pipeline = append(pipeline, bson.D{{Key: "$group", Value: bson.D{{Key: "_id", Value: nil}, {Key: "total", Value: bson.D{{Key: "$sum", Value: "$recv_bytes"}}}}}})
}
cur, err := m.getTrafficCollection().Aggregate(context.Background(), pipeline)
if err != nil {
return 0, err
}
defer cur.Close(context.Background())
total := int64(0)
for cur.Next(context.Background()) {
var result struct {
Total int64 `bson:"total"`
}
if err := cur.Decode(&result); err != nil {
return 0, err
}
total += result.Total
}
return total, nil
}

func (m *mongoDB) getTrafficCollection() *mongo.Collection {
return m.Client.Database(m.TrafficDB).Collection(m.TrafficConn)
}

0 comments on commit e819739

Please sign in to comment.