Skip to content

Commit

Permalink
fix(usage): add missing org usage collection
Browse files Browse the repository at this point in the history
  • Loading branch information
heiruwu committed Feb 20, 2024
1 parent 446f013 commit 239d3f4
Show file tree
Hide file tree
Showing 4 changed files with 82 additions and 41 deletions.
2 changes: 0 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -1311,8 +1311,6 @@ github.com/instill-ai/protogen-go v0.3.3-alpha.0.20240215033939-b0bf95f922f1 h1:
github.com/instill-ai/protogen-go v0.3.3-alpha.0.20240215033939-b0bf95f922f1/go.mod h1:jhEL0SauySMoPLVvx105DWyThju9sYTbsXIySVCArmM=
github.com/instill-ai/usage-client v0.2.4-alpha.0.20240123081026-6c78d9a5197a h1:gmy8BcCFDZQan40c/D3f62DwTYtlCwi0VrSax+pKffw=
github.com/instill-ai/usage-client v0.2.4-alpha.0.20240123081026-6c78d9a5197a/go.mod h1:EpX3Yr661uWULtZf5UnJHfr5rw2PDyX8ku4Kx0UtYFw=
github.com/instill-ai/usage-client v0.2.4-alpha.0.20240216080820-62ace2cbc155 h1:hylVIWcQa0Lkbl8W3OkDBJCrerU2iN+AxFynAZgDrxE=
github.com/instill-ai/usage-client v0.2.4-alpha.0.20240216080820-62ace2cbc155/go.mod h1:8lvtZulkhQ7t8alttb2KkLKYoCp5u4oatzDbfFlEld0=
github.com/instill-ai/x v0.3.0-alpha h1:z9fedROOG2dVHhswBfVwU/hzHuq8/JKSUON7inF+FH8=
github.com/instill-ai/x v0.3.0-alpha/go.mod h1:YVYjkbqc5FKatJ+jZa1S/8e4xHkQ8j9V3RYboqBpoR0=
github.com/intel/goresctrl v0.2.0/go.mod h1:+CZdzouYFn5EsxgqAQTEzMfwKwuc0fVdMrT9FCCAVRQ=
Expand Down
38 changes: 11 additions & 27 deletions pkg/external/external.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,16 +3,14 @@ package external
import (
"context"
"crypto/tls"
"crypto/x509"
"fmt"
"time"

"google.golang.org/grpc"
"google.golang.org/grpc/backoff"
"google.golang.org/grpc/credentials"
"google.golang.org/grpc/credentials/insecure"

"github.com/instill-ai/model-backend/config"
"github.com/instill-ai/model-backend/pkg/constant"
custom_logger "github.com/instill-ai/model-backend/pkg/logger"

mgmtPB "github.com/instill-ai/protogen-go/core/mgmt/v1beta"
Expand Down Expand Up @@ -68,43 +66,29 @@ func InitMgmtPrivateServiceClient(ctx context.Context) (mgmtPB.MgmtPrivateServic
return mgmtPB.NewMgmtPrivateServiceClient(clientConn), clientConn
}

// InitUsageServiceClient initializes a UsageServiceClient instance
// InitUsageServiceClient initializes a UsageServiceClient instance (no mTLS)
func InitUsageServiceClient(ctx context.Context) (usagePB.UsageServiceClient, *grpc.ClientConn) {
logger, _ := custom_logger.GetZapLogger(ctx)

var clientDialOpts grpc.DialOption
var err error
if config.Config.Server.Usage.TLSEnabled {
roots, err := x509.SystemCertPool()
if err != nil {
logger.Fatal(err.Error())
}

tlsConfig := tls.Config{
RootCAs: roots,
InsecureSkipVerify: true,
NextProtos: []string{"h2"},
}
clientDialOpts = grpc.WithTransportCredentials(credentials.NewTLS(&tlsConfig))
tlsConfig := &tls.Config{}
clientDialOpts = grpc.WithTransportCredentials(credentials.NewTLS(tlsConfig))
} else {
clientDialOpts = grpc.WithTransportCredentials(insecure.NewCredentials())
}

clientConn, err := grpc.Dial(
fmt.Sprintf("%v:%v", config.Config.Server.Usage.Host, config.Config.Server.Usage.Port),
clientDialOpts,
grpc.WithConnectParams(grpc.ConnectParams{
Backoff: backoff.Config{
BaseDelay: 500 * time.Millisecond,
Multiplier: 1.5,
Jitter: 0.2,
MaxDelay: 19 * time.Second,
},
MinConnectTimeout: 5 * time.Second,
}),
clientDialOpts, grpc.WithDefaultCallOptions(
grpc.MaxCallRecvMsgSize(config.Config.Server.MaxDataSize*constant.MB),
grpc.MaxCallSendMsgSize(config.Config.Server.MaxDataSize*constant.MB),
),
)

if err != nil {
logger.Fatal(err.Error())
logger.Error(err.Error())
return nil, nil
}

return usagePB.NewUsageServiceClient(clientConn), clientConn
Expand Down
2 changes: 1 addition & 1 deletion pkg/service/collect.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ func (s *service) WriteNewDataPoint(ctx context.Context, data *utils.UsageMetric
return err
}

s.redisClient.RPush(ctx, fmt.Sprintf("user:%s:model.trigger_data", data.OwnerUID), string(bData))
s.redisClient.RPush(ctx, fmt.Sprintf("owner:%s:model.trigger_data", data.OwnerUID), string(bData))
}

return nil
Expand Down
81 changes: 70 additions & 11 deletions pkg/usage/usage.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,10 +74,10 @@ func (u *usage) RetrieveUsageData() any {

// Roll over all users and update the metrics with the cached uuid
userPageToken := ""
userPageSizeMax := int32(repository.MaxPageSize)
pageSizeMax := int32(repository.MaxPageSize)
for {
userResp, err := u.mgmtPrivateServiceClient.ListUsersAdmin(ctx, &mgmtPB.ListUsersAdminRequest{
PageSize: &userPageSizeMax,
PageSize: &pageSizeMax,
PageToken: &userPageToken,
})
if err != nil {
Expand All @@ -86,16 +86,16 @@ func (u *usage) RetrieveUsageData() any {
}

// Roll all model resources on a user
for _, user := range userResp.Users {
for _, user := range userResp.GetUsers() {

triggerDataList := []*usagePB.ModelUsageData_UserUsageData_ModelTriggerData{}

triggerCount := u.redisClient.LLen(ctx, fmt.Sprintf("user:%s:model.trigger_data", user.GetUid())).Val() // O(1)
triggerCount := u.redisClient.LLen(ctx, fmt.Sprintf("owner:%s:model.trigger_data", user.GetUid())).Val() // O(1)

if triggerCount != 0 {
for i := int64(0); i < triggerCount; i++ {

strData := u.redisClient.LPop(ctx, fmt.Sprintf("user:%s:model.trigger_data", user.GetUid())).Val()
strData := u.redisClient.LPop(ctx, fmt.Sprintf("owner:%s:model.trigger_data", user.GetUid())).Val()

triggerData := &utils.UsageMetricData{}
if err := json.Unmarshal([]byte(strData), triggerData); err != nil {
Expand All @@ -118,13 +118,12 @@ func (u *usage) RetrieveUsageData() any {
},
)
}
pbModelUsageData = append(pbModelUsageData, &usagePB.ModelUsageData_UserUsageData{
OwnerUid: user.GetUid(),
OwnerType: mgmtPB.OwnerType_OWNER_TYPE_USER,
ModelTriggerData: triggerDataList,
})
}

pbModelUsageData = append(pbModelUsageData, &usagePB.ModelUsageData_UserUsageData{
OwnerUid: user.GetUid(),
OwnerType: mgmtPB.OwnerType_OWNER_TYPE_USER,
ModelTriggerData: triggerDataList,
})
}

if userResp.NextPageToken == "" {
Expand All @@ -134,6 +133,66 @@ func (u *usage) RetrieveUsageData() any {
}
}

// Roll over all orgs and update the metrics with the cached uuid
orgPageToken := ""
for {
orgResp, err := u.mgmtPrivateServiceClient.ListOrganizationsAdmin(ctx, &mgmtPB.ListOrganizationsAdminRequest{
PageSize: &pageSizeMax,
PageToken: &orgPageToken,
})
if err != nil {
logger.Error(fmt.Sprintf("[mgmt-backend: ListOrganizationsAdmin] %s", err))
break
}

// Roll all model resources on an org
for _, org := range orgResp.GetOrganizations() {

triggerDataList := []*usagePB.ModelUsageData_UserUsageData_ModelTriggerData{}

triggerCount := u.redisClient.LLen(ctx, fmt.Sprintf("owner:%s:model.trigger_data", org.GetUid())).Val() // O(1)

if triggerCount != 0 {
for i := int64(0); i < triggerCount; i++ {

strData := u.redisClient.LPop(ctx, fmt.Sprintf("owner:%s:model.trigger_data", org.GetUid())).Val()

triggerData := &utils.UsageMetricData{}
if err := json.Unmarshal([]byte(strData), triggerData); err != nil {
logger.Warn("Usage data might be corrupted")
}

triggerTime, _ := time.Parse(time.RFC3339Nano, triggerData.TriggerTime)

triggerDataList = append(
triggerDataList,
&usagePB.ModelUsageData_UserUsageData_ModelTriggerData{
TriggerUid: triggerData.TriggerUID,
TriggerTime: timestamppb.New(triggerTime),
ModelUid: triggerData.ModelUID,
ModelDefinitionUid: triggerData.ModelDefinitionUID,
ModelTask: triggerData.ModelTask,
Status: triggerData.Status,
UserUid: triggerData.UserUID,
UserType: triggerData.UserType,
},
)
}
pbModelUsageData = append(pbModelUsageData, &usagePB.ModelUsageData_UserUsageData{
OwnerUid: org.GetUid(),
OwnerType: mgmtPB.OwnerType_OWNER_TYPE_ORGANIZATION,
ModelTriggerData: triggerDataList,
})
}
}

if orgResp.NextPageToken == "" {
break
} else {
orgPageToken = orgResp.NextPageToken
}
}

logger.Debug("Send retrieved usage data...")
return &usagePB.SessionReport_ModelUsageData{
ModelUsageData: &usagePB.ModelUsageData{
Expand Down

0 comments on commit 239d3f4

Please sign in to comment.