Skip to content

Commit

Permalink
fix: calculate the trigger_count with batch_size (#338)
Browse files Browse the repository at this point in the history
Because

- we need to use the batch_size to increment the trigger_count

This commit

- calculate the trigger_count with batch_size
  • Loading branch information
donch1989 committed Dec 20, 2023
1 parent 6e282eb commit 423e6c9
Show file tree
Hide file tree
Showing 4 changed files with 10 additions and 3 deletions.
1 change: 1 addition & 0 deletions pkg/middleware/interceptor.go
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,7 @@ func AsGRPCError(err error) error {
errors.Is(err, handler.ErrCheckUpdateImmutableFields),
errors.Is(err, handler.ErrCheckOutputOnlyFields),
errors.Is(err, handler.ErrCheckRequiredFields),
errors.Is(err, service.ErrExceedMaxBatchSize),
errors.Is(err, handler.ErrFieldMask),
errors.Is(err, handler.ErrResourceID),
errors.Is(err, handler.ErrSematicVersion),
Expand Down
1 change: 1 addition & 0 deletions pkg/service/errors.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,3 +9,4 @@ var ErrRateLimiting = errors.New("rate limiting")
var ErrNamespaceTriggerQuotaExceed = errors.New("namespace trigger quota exceed")
var ErrNamespacePrivatePipelineQuotaExceed = errors.New("namespace private pipeline quota exceed")
var ErrCanNotTriggerNonLatestPipelineRelease = errors.New("can not trigger non-latest pipeline release")
var ErrExceedMaxBatchSize = errors.New("the batch size can not exceed 32")
8 changes: 6 additions & 2 deletions pkg/service/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -932,6 +932,10 @@ func (s *service) UpdateNamespacePipelineIDByID(ctx context.Context, ns resource

func (s *service) preTriggerPipeline(ctx context.Context, isPublic bool, ns resource.Namespace, authUser *AuthUser, recipe *datamodel.Recipe, pipelineInputs []*structpb.Struct) error {

batchSize := len(pipelineInputs)
if batchSize > constant.MaxBatchSize {
return ErrExceedMaxBatchSize
}
if isPublic {
value, err := s.redisClient.Get(context.Background(), fmt.Sprintf("user_rate_limit:user:%s", authUser.UID)).Result()
// TODO: use a more robust way to check key exist
Expand All @@ -958,7 +962,7 @@ func (s *service) preTriggerPipeline(ctx context.Context, isPublic bool, ns reso
return err
}
} else {
if resp.Subscription.Quota.PrivatePipelineTrigger.Remain == 0 {
if resp.Subscription.Quota.PrivatePipelineTrigger.Remain-int32(batchSize) < 0 {
return ErrNamespaceTriggerQuotaExceed
}
}
Expand All @@ -977,7 +981,7 @@ func (s *service) preTriggerPipeline(ctx context.Context, isPublic bool, ns reso
return err
}
} else {
if resp.Subscription.Quota.PrivatePipelineTrigger.Remain == 0 {
if resp.Subscription.Quota.PrivatePipelineTrigger.Remain-int32(batchSize) < 0 {
return ErrNamespaceTriggerQuotaExceed
}
}
Expand Down
3 changes: 2 additions & 1 deletion pkg/worker/workflow.go
Original file line number Diff line number Diff line change
Expand Up @@ -604,9 +604,10 @@ func (w *worker) TriggerPipelineWorkflow(ctx workflow.Context, param *TriggerPip
dataPoint.Status = mgmtPB.Status_STATUS_COMPLETED

if !param.IsPublic {
w.redisClient.Incr(
w.redisClient.IncrBy(
context.Background(),
fmt.Sprintf("private_trigger_count:%s", param.OwnerPermalink),
int64(batchSize),
)
}

Expand Down

0 comments on commit 423e6c9

Please sign in to comment.