Skip to content

Commit

Permalink
Fix/traffic db name (#4548)
Browse files Browse the repository at this point in the history
* fix custom traffic db name

* optimize traffic speed
  • Loading branch information
bxy4543 committed Mar 4, 2024
1 parent b8fc1d7 commit 14e397c
Show file tree
Hide file tree
Showing 4 changed files with 86 additions and 48 deletions.
2 changes: 1 addition & 1 deletion controllers/pkg/database/interface.go
Expand Up @@ -52,7 +52,7 @@ type Account interface {
GetBillingCount(accountType common.Type, startTime, endTime time.Time) (count, amount int64, err error)
GenerateBillingData(startTime, endTime time.Time, prols *resources.PropertyTypeLS, namespaces []string, owner string) (orderID []string, amount int64, err error)
InsertMonitor(ctx context.Context, monitors ...*resources.Monitor) error
GetDistinctMonitorCombinations(startTime, endTime time.Time, namespace string) ([]resources.Monitor, error)
GetDistinctMonitorCombinations(startTime, endTime time.Time) ([]resources.Monitor, error)
DropMonitorCollectionsOlderThan(days int) error
Disconnect(ctx context.Context) error
Creator
Expand Down
37 changes: 23 additions & 14 deletions controllers/pkg/database/mongo/account.go
Expand Up @@ -22,6 +22,8 @@ import (
"strings"
"time"

"github.com/labring/sealos/controllers/pkg/utils/env"

"github.com/labring/sealos/controllers/pkg/common"
"github.com/labring/sealos/controllers/pkg/database"

Expand All @@ -39,9 +41,15 @@ import (
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
)

const (
EnvAccountDBName = "ACCOUNT_DB_NAME"
EnvTrafficDBName = "TRAFFIC_DB_NAME"
EnvTrafficConn = "TRAFFIC_CONN"
)

const (
DefaultAccountDBName = "sealos-resources"
DefaultTrafficDBName = "sealos-networkmanager-synchronizer"
DefaultTrafficDBName = "sealos-networkmanager"
DefaultAuthDBName = "sealos-auth"
DefaultMeteringConn = "metering"
DefaultMonitorConn = "monitor"
Expand Down Expand Up @@ -251,14 +259,13 @@ func (m *mongoDB) InsertMonitor(ctx context.Context, monitors ...*resources.Moni
return err
}

func (m *mongoDB) GetDistinctMonitorCombinations(startTime, endTime time.Time, namespace string) ([]resources.Monitor, error) {
func (m *mongoDB) GetDistinctMonitorCombinations(startTime, endTime time.Time) ([]resources.Monitor, error) {
pipeline := mongo.Pipeline{
{{Key: "$match", Value: bson.M{
"time": bson.M{
"$gte": startTime.UTC(),
"$lt": endTime.UTC(),
},
"category": namespace,
}}},
{{Key: "$group", Value: bson.M{
"_id": bson.M{
Expand All @@ -267,21 +274,23 @@ func (m *mongoDB) GetDistinctMonitorCombinations(startTime, endTime time.Time, n
"type": "$type",
},
}}},
{{Key: "$project", Value: bson.M{
"_id": 0,
"category": "$_id.category",
"name": "$_id.name",
"type": "$_id.type",
}}},
}
cursor, err := m.getMonitorCollection(startTime).Aggregate(context.Background(), pipeline)
if err != nil {
return nil, fmt.Errorf("aggregate error: %v", err)
}
defer cursor.Close(context.Background())
var monitors []resources.Monitor
for cursor.Next(context.Background()) {
var result = make(map[string]resources.Monitor, 1)
if err := cursor.Decode(result); err != nil {
return nil, fmt.Errorf("decode error: %v", err)
}
monitors = append(monitors, result["_id"])
if !cursor.Next(context.Background()) {
return nil, nil
}
if err := cursor.Err(); err != nil {
var monitors []resources.Monitor
if err := cursor.All(context.Background(), &monitors); err != nil {
return nil, fmt.Errorf("cursor error: %v", err)
}
return monitors, nil
Expand Down Expand Up @@ -941,15 +950,15 @@ func NewMongoInterface(ctx context.Context, URL string) (database.Interface, err
err = client.Ping(ctx, nil)
return &mongoDB{
Client: client,
AccountDB: DefaultAccountDBName,
TrafficDB: DefaultTrafficDBName,
AccountDB: env.GetEnvWithDefault(EnvAccountDBName, DefaultAccountDBName),
TrafficDB: env.GetEnvWithDefault(EnvTrafficDBName, DefaultTrafficDBName),
AuthDB: DefaultAuthDBName,
UserConn: DefaultUserConn,
MeteringConn: DefaultMeteringConn,
MonitorConnPrefix: DefaultMonitorConn,
BillingConn: DefaultBillingConn,
PricesConn: DefaultPricesConn,
PropertiesConn: DefaultPropertiesConn,
TrafficConn: DefaultTrafficConn,
TrafficConn: env.GetEnvWithDefault(EnvTrafficConn, DefaultTrafficConn),
}, err
}
20 changes: 20 additions & 0 deletions controllers/pkg/database/mongo/account_test.go
Expand Up @@ -594,3 +594,23 @@ func TestMongoDB_SetPropertyTypeLS(t *testing.T) {
// t.Fatalf("failed to save property types: %v", err)
//}
}

func Test_mongoDB_GetDistinctMonitorCombinations(t *testing.T) {
dbCTX := context.Background()

m, err := NewMongoInterface(dbCTX, os.Getenv("MONGODB_URI"))
if err != nil {
t.Errorf("failed to connect mongo: error = %v", err)
}
defer func() {
if err = m.Disconnect(dbCTX); err != nil {
t.Errorf("failed to disconnect mongo: error = %v", err)
}
}()
queryTime := time.Now().UTC()
monitorCombinations, err := m.GetDistinctMonitorCombinations(queryTime.Add(-time.Hour), queryTime)
if err != nil {
t.Fatalf("failed to get distinct monitor combinations: %v", err)
}
t.Logf("monitorCombinations: %v", monitorCombinations)
}
75 changes: 42 additions & 33 deletions controllers/resources/controllers/monitor_controller.go
Expand Up @@ -24,6 +24,8 @@ import (
"sync"
"time"

"golang.org/x/sync/errgroup"

"github.com/labring/sealos/controllers/pkg/utils/env"

"golang.org/x/sync/semaphore"
Expand Down Expand Up @@ -400,47 +402,54 @@ func (r *MonitorReconciler) getObjStorageUsed(user string, namedMap *map[string]
}

func (r *MonitorReconciler) MonitorPodTrafficUsed(startTime, endTime time.Time) error {
namespaceList, err := r.getNamespaceList()
if err != nil {
return fmt.Errorf("failed to list namespaces")
}
logger.Info("start getPodTrafficUsed", "startTime", startTime.Format(time.RFC3339), "endTime", endTime.Format(time.RFC3339))
for _, namespace := range namespaceList.Items {
if err := r.monitorPodTrafficUsed(namespace, startTime, endTime); err != nil {
r.Logger.Error(err, "failed to monitor pod traffic used", "namespace", namespace.Name)
}
execTime := time.Now().UTC()
if err := r.monitorPodTrafficUsed(startTime, endTime); err != nil {
r.Logger.Error(err, "failed to monitor pod traffic used")
}
r.Logger.Info("success to monitor pod traffic used", "startTime", startTime.Format(time.RFC3339), "endTime", endTime.Format(time.RFC3339), "execTime", time.Since(execTime).String())
return nil
}

func (r *MonitorReconciler) monitorPodTrafficUsed(namespace corev1.Namespace, startTime, endTime time.Time) error {
monitors, err := r.DBClient.GetDistinctMonitorCombinations(startTime, endTime, namespace.Name)
func (r *MonitorReconciler) monitorPodTrafficUsed(startTime, endTime time.Time) error {
monitors, err := r.DBClient.GetDistinctMonitorCombinations(startTime, endTime)
if err != nil {
return fmt.Errorf("failed to get distinct monitor combinations: %w", err)
}
for _, monitor := range monitors {
bytes, err := r.TrafficClient.GetTrafficSentBytes(startTime, endTime, namespace.Name, monitor.Type, monitor.Name)
if err != nil {
return fmt.Errorf("failed to get traffic sent bytes: %w", err)
}
unit := r.Properties.StringMap[resources.ResourceNetwork].Unit
used := int64(math.Ceil(float64(resource.NewQuantity(bytes, resource.BinarySI).MilliValue()) / float64(unit.MilliValue())))
if used == 0 {
continue
}
logger.Info("traffic used ", "monitor", monitor, "used", used, "unit", unit, "bytes", bytes)
ro := resources.Monitor{
Category: namespace.Name,
Name: monitor.Name,
Used: map[uint8]int64{r.Properties.StringMap[resources.ResourceNetwork].Enum: used},
Time: endTime.Add(-1 * time.Minute),
Type: monitor.Type,
}
r.Logger.Info("monitor traffic used", "monitor", ro)
err = r.DBClient.InsertMonitor(context.Background(), &ro)
if err != nil {
return fmt.Errorf("failed to insert monitor: %w", err)
}
r.Logger.Info("distinct monitor combinations", "monitors len", len(monitors))
wg, _ := errgroup.WithContext(context.Background())
wg.SetLimit(100)
for i := range monitors {
monitor := monitors[i]
wg.Go(func() error {
return r.handlerTrafficUsed(startTime, endTime, monitor)
})
}
return wg.Wait()
}

func (r *MonitorReconciler) handlerTrafficUsed(startTime, endTime time.Time, monitor resources.Monitor) error {
bytes, err := r.TrafficClient.GetTrafficSentBytes(startTime, endTime, monitor.Category, monitor.Type, monitor.Name)
if err != nil {
return fmt.Errorf("failed to get traffic sent bytes: %w", err)
}
unit := r.Properties.StringMap[resources.ResourceNetwork].Unit
used := int64(math.Ceil(float64(resource.NewQuantity(bytes, resource.BinarySI).MilliValue()) / float64(unit.MilliValue())))
if used == 0 {
return nil
}
//logger.Info("traffic used ", "monitor", monitor, "used", used, "unit", unit, "bytes", bytes)
ro := resources.Monitor{
Category: monitor.Category,
Name: monitor.Name,
Used: map[uint8]int64{r.Properties.StringMap[resources.ResourceNetwork].Enum: used},
Time: endTime.Add(-1 * time.Minute),
Type: monitor.Type,
}
r.Logger.Info("monitor traffic used", "monitor", ro)
err = r.DBClient.InsertMonitor(context.Background(), &ro)
if err != nil {
return fmt.Errorf("failed to insert monitor: %w", err)
}
return nil
}
Expand Down

0 comments on commit 14e397c

Please sign in to comment.