Skip to content

Commit

Permalink
Add flag to enable execution scanner event Id validator (#4114)
Browse files Browse the repository at this point in the history
* Add flag to enable execution scanner event id validator
  • Loading branch information
yux0 committed Mar 28, 2023
1 parent 47424c0 commit e4a49b2
Show file tree
Hide file tree
Showing 6 changed files with 52 additions and 34 deletions.
2 changes: 2 additions & 0 deletions common/dynamicconfig/constants.go
Expand Up @@ -707,6 +707,8 @@ const (
ExecutionDataDurationBuffer = "worker.executionDataDurationBuffer"
// ExecutionScannerWorkerCount is the execution scavenger worker count
ExecutionScannerWorkerCount = "worker.executionScannerWorkerCount"
// ExecutionScannerHistoryEventIdValidator is the flag to enable history event id validator
ExecutionScannerHistoryEventIdValidator = "worker.executionEnableHistoryEventIdValidator"
// TaskQueueScannerEnabled indicates if task queue scanner should be started as part of worker.Scanner
TaskQueueScannerEnabled = "worker.taskQueueScannerEnabled"
// HistoryScannerEnabled indicates if history scanner should be started as part of worker.Scanner
Expand Down
32 changes: 18 additions & 14 deletions service/worker/scanner/executions/scavenger.go
Expand Up @@ -56,16 +56,17 @@ type (
numHistoryShards int32
activityContext context.Context

executionManager persistence.ExecutionManager
registry namespace.Registry
historyClient historyservice.HistoryServiceClient
adminClient adminservice.AdminServiceClient
executor executor.Executor
rateLimiter quotas.RateLimiter
perShardQPS dynamicconfig.IntPropertyFn
executionDataDurationBuffer dynamicconfig.DurationPropertyFn
metricsHandler metrics.Handler
logger log.Logger
executionManager persistence.ExecutionManager
registry namespace.Registry
historyClient historyservice.HistoryServiceClient
adminClient adminservice.AdminServiceClient
executor executor.Executor
rateLimiter quotas.RateLimiter
perShardQPS dynamicconfig.IntPropertyFn
executionDataDurationBuffer dynamicconfig.DurationPropertyFn
enableHistoryEventIDValidator dynamicconfig.BoolPropertyFn
metricsHandler metrics.Handler
logger log.Logger

stopC chan struct{}
stopWG sync.WaitGroup
Expand All @@ -89,6 +90,7 @@ func NewScavenger(
perShardQPS dynamicconfig.IntPropertyFn,
executionDataDurationBuffer dynamicconfig.DurationPropertyFn,
executionTaskWorker dynamicconfig.IntPropertyFn,
enableHistoryEventIDValidator dynamicconfig.BoolPropertyFn,
executionManager persistence.ExecutionManager,
registry namespace.Registry,
historyClient historyservice.HistoryServiceClient,
Expand All @@ -112,10 +114,11 @@ func NewScavenger(
rateLimiter: quotas.NewDefaultOutgoingRateLimiter(
func() float64 { return float64(perHostQPS()) },
),
perShardQPS: perShardQPS,
executionDataDurationBuffer: executionDataDurationBuffer,
metricsHandler: metricsHandler.WithTags(metrics.OperationTag(metrics.ExecutionsScavengerScope)),
logger: logger,
perShardQPS: perShardQPS,
executionDataDurationBuffer: executionDataDurationBuffer,
enableHistoryEventIDValidator: enableHistoryEventIDValidator,
metricsHandler: metricsHandler.WithTags(metrics.OperationTag(metrics.ExecutionsScavengerScope)),
logger: logger,

stopC: make(chan struct{}),
}
Expand Down Expand Up @@ -185,6 +188,7 @@ func (s *Scavenger) run() {
s.rateLimiter,
}),
s.executionDataDurationBuffer,
s.enableHistoryEventIDValidator,
))
if !submitted {
s.logger.Error("unable to submit task to executor", tag.ShardID(shardID))
Expand Down
45 changes: 25 additions & 20 deletions service/worker/scanner/executions/task.go
Expand Up @@ -66,10 +66,11 @@ type (
logger log.Logger
scavenger *Scavenger

ctx context.Context
rateLimiter quotas.RateLimiter
executionDataDurationBuffer dynamicconfig.DurationPropertyFn
paginationToken []byte
ctx context.Context
rateLimiter quotas.RateLimiter
executionDataDurationBuffer dynamicconfig.DurationPropertyFn
enableHistoryEventIDValidator dynamicconfig.BoolPropertyFn
paginationToken []byte
}
)

Expand All @@ -86,6 +87,7 @@ func newTask(
scavenger *Scavenger,
rateLimiter quotas.RateLimiter,
executionDataDurationBuffer dynamicconfig.DurationPropertyFn,
enableHistoryEventIDValidator dynamicconfig.BoolPropertyFn,
) executor.Task {
return &task{
shardID: shardID,
Expand All @@ -98,9 +100,10 @@ func newTask(
logger: logger,
scavenger: scavenger,

ctx: ctx,
rateLimiter: rateLimiter,
executionDataDurationBuffer: executionDataDurationBuffer,
ctx: ctx,
rateLimiter: rateLimiter,
executionDataDurationBuffer: executionDataDurationBuffer,
enableHistoryEventIDValidator: enableHistoryEventIDValidator,
}
}

Expand Down Expand Up @@ -186,19 +189,21 @@ func (t *task) validate(
return results
}

if validationResults, err := NewHistoryEventIDValidator(
t.shardID,
t.executionManager,
).Validate(t.ctx, mutableState); err != nil {
t.logger.Error("unable to validate history event ID being contiguous",
tag.ShardID(t.shardID),
tag.WorkflowNamespaceID(mutableState.GetExecutionInfo().GetNamespaceId()),
tag.WorkflowID(mutableState.GetExecutionInfo().GetWorkflowId()),
tag.WorkflowRunID(mutableState.GetExecutionState().GetRunId()),
tag.Error(err),
)
} else {
results = append(results, validationResults...)
if t.enableHistoryEventIDValidator() {
if validationResults, err := NewHistoryEventIDValidator(
t.shardID,
t.executionManager,
).Validate(t.ctx, mutableState); err != nil {
t.logger.Error("unable to validate history event ID being contiguous",
tag.ShardID(t.shardID),
tag.WorkflowNamespaceID(mutableState.GetExecutionInfo().GetNamespaceId()),
tag.WorkflowID(mutableState.GetExecutionInfo().GetWorkflowId()),
tag.WorkflowRunID(mutableState.GetExecutionState().GetRunId()),
tag.Error(err),
)
} else {
results = append(results, validationResults...)
}
}

return results
Expand Down
2 changes: 2 additions & 0 deletions service/worker/scanner/scanner.go
Expand Up @@ -82,6 +82,8 @@ type (
ExecutionDataDurationBuffer dynamicconfig.DurationPropertyFn
// ExecutionScannerWorkerCount is the execution scavenger task worker number
ExecutionScannerWorkerCount dynamicconfig.IntPropertyFn
// ExecutionScannerHistoryEventIdValidator indicates if the execution scavenger to validate history event id.
ExecutionScannerHistoryEventIdValidator dynamicconfig.BoolPropertyFn
}

// scannerContext is the context object that get's
Expand Down
1 change: 1 addition & 0 deletions service/worker/scanner/workflow.go
Expand Up @@ -196,6 +196,7 @@ func ExecutionsScavengerActivity(
ctx.cfg.ExecutionScannerPerShardQPS,
ctx.cfg.ExecutionDataDurationBuffer,
ctx.cfg.ExecutionScannerWorkerCount,
ctx.cfg.ExecutionScannerHistoryEventIdValidator,
ctx.executionManager,
ctx.namespaceRegistry,
ctx.historyClient,
Expand Down
4 changes: 4 additions & 0 deletions service/worker/service.go
Expand Up @@ -302,6 +302,10 @@ func NewConfig(dc *dynamicconfig.Collection, persistenceConfig *config.Persisten
dynamicconfig.ExecutionScannerWorkerCount,
8,
),
ExecutionScannerHistoryEventIdValidator: dc.GetBoolProperty(
dynamicconfig.ExecutionScannerHistoryEventIdValidator,
true,
),
},
EnableBatcher: dc.GetBoolProperty(dynamicconfig.EnableBatcher, true),
BatcherRPS: dc.GetIntPropertyFilteredByNamespace(dynamicconfig.BatcherRPS, batcher.DefaultRPS),
Expand Down

0 comments on commit e4a49b2

Please sign in to comment.