Skip to content

Commit

Permalink
Add execution scavenger for retention (#3457)
Browse files Browse the repository at this point in the history
* Add execution scavenger for retention
  • Loading branch information
yux0 authored and dnr committed Oct 17, 2022
1 parent bb7b1f4 commit ff47b2c
Show file tree
Hide file tree
Showing 8 changed files with 235 additions and 55 deletions.
6 changes: 6 additions & 0 deletions common/dynamicconfig/constants.go
Original file line number Diff line number Diff line change
Expand Up @@ -639,6 +639,12 @@ const (
WorkerScannerMaxConcurrentWorkflowTaskPollers = "worker.ScannerMaxConcurrentWorkflowTaskPollers"
// ScannerPersistenceMaxQPS is the maximum rate of persistence calls from worker.Scanner
ScannerPersistenceMaxQPS = "worker.scannerPersistenceMaxQPS"
// ExecutionScannerPerHostQPS is the maximum rate of calls per host from executions.Scanner
ExecutionScannerPerHostQPS = "worker.executionScannerPerHostQPS"
// ExecutionScannerPerShardQPS is the maximum rate of calls per shard from executions.Scanner
ExecutionScannerPerShardQPS = "worker.executionScannerPerShardQPS"
// ExecutionDataDurationBuffer is the data TTL duration buffer of execution data
ExecutionDataDurationBuffer = "worker.executionDataDurationBuffer"
// TaskQueueScannerEnabled indicates if task queue scanner should be started as part of worker.Scanner
TaskQueueScannerEnabled = "worker.taskQueueScannerEnabled"
// HistoryScannerEnabled indicates if history scanner should be started as part of worker.Scanner
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,36 +27,53 @@ package executions
import (
"context"
"fmt"
"time"

"go.temporal.io/api/serviceerror"

"go.temporal.io/server/api/enums/v1"
persistencespb "go.temporal.io/server/api/persistence/v1"
"go.temporal.io/server/common"
"go.temporal.io/server/common/dynamicconfig"
"go.temporal.io/server/common/namespace"
"go.temporal.io/server/common/persistence/versionhistory"
"go.temporal.io/server/common/primitives/timestamp"
)

const (
mutableStateActivityIDFailureType = "mutable_state_id_validator_activity"
mutableStateTimerIDFailureType = "mutable_state_id_validator_timer"
mutableStateChildWorkflowIDFailureType = "mutable_state_id_validator_child_workflow"
mutableStateRequestCancelIDFailureType = "mutable_state_id_validator_request_cancel"
mutableStateSignalIDFailureType = "mutable_state_id_validator_signal"
mutableStateActivityIDFailureType = "mutable_state_validator_activity"
mutableStateTimerIDFailureType = "mutable_state_validator_timer"
mutableStateChildWorkflowIDFailureType = "mutable_state_validator_child_workflow"
mutableStateRequestCancelIDFailureType = "mutable_state_validator_request_cancel"
mutableStateSignalIDFailureType = "mutable_state_validator_signal"
mutableStateRetentionFailureType = "mutable_state_validator_retention"
)

type (
// mutableStateIDValidator is a validator that does shallow checks that
// mutableStateValidator is a validator that does shallow checks that
// * ID >= common.FirstEventID
// * ID <= last event ID
mutableStateIDValidator struct{}
mutableStateValidator struct {
registry namespace.Registry
executionDataDurationBuffer dynamicconfig.DurationPropertyFn
}
)

var _ Validator = (*mutableStateIDValidator)(nil)
var _ Validator = (*mutableStateValidator)(nil)

// NewMutableStateIDValidator returns new instance.
func NewMutableStateIDValidator() *mutableStateIDValidator {
return &mutableStateIDValidator{}
// NewMutableStateValidator returns new instance.
func NewMutableStateValidator(
registry namespace.Registry,
executionDataDurationBuffer dynamicconfig.DurationPropertyFn,
) *mutableStateValidator {
return &mutableStateValidator{
registry: registry,
executionDataDurationBuffer: executionDataDurationBuffer,
}
}

// Validate does shallow correctness check of IDs in mutable state.
func (v *mutableStateIDValidator) Validate(
func (v *mutableStateValidator) Validate(
ctx context.Context,
mutableState *MutableState,
) ([]MutableStateValidationResult, error) {
Expand Down Expand Up @@ -99,10 +116,21 @@ func (v *mutableStateIDValidator) Validate(
lastItem.GetEventId())...,
)

retentionResult, err := v.validateRetention(
mutableState.GetExecutionInfo(),
mutableState.GetExecutionState().GetState(),
)
if err != nil {
return results, err
}
if retentionResult != nil {
results = append(results, *retentionResult)
}

return results, nil
}

func (v *mutableStateIDValidator) validateActivity(
func (v *mutableStateValidator) validateActivity(
activityInfos map[int64]*persistencespb.ActivityInfo,
lastEventID int64,
) []MutableStateValidationResult {
Expand All @@ -123,7 +151,7 @@ func (v *mutableStateIDValidator) validateActivity(
return results
}

func (v *mutableStateIDValidator) validateTimer(
func (v *mutableStateValidator) validateTimer(
timerInfos map[string]*persistencespb.TimerInfo,
lastEventID int64,
) []MutableStateValidationResult {
Expand All @@ -144,7 +172,7 @@ func (v *mutableStateIDValidator) validateTimer(
return results
}

func (v *mutableStateIDValidator) validateChildWorkflow(
func (v *mutableStateValidator) validateChildWorkflow(
childExecutionInfos map[int64]*persistencespb.ChildExecutionInfo,
lastEventID int64,
) []MutableStateValidationResult {
Expand All @@ -165,7 +193,7 @@ func (v *mutableStateIDValidator) validateChildWorkflow(
return results
}

func (v *mutableStateIDValidator) validateRequestCancel(
func (v *mutableStateValidator) validateRequestCancel(
requestCancelInfos map[int64]*persistencespb.RequestCancelInfo,
lastEventID int64,
) []MutableStateValidationResult {
Expand All @@ -186,7 +214,7 @@ func (v *mutableStateIDValidator) validateRequestCancel(
return results
}

func (v *mutableStateIDValidator) validateSignal(
func (v *mutableStateValidator) validateSignal(
signalInfos map[int64]*persistencespb.SignalInfo,
lastEventID int64,
) []MutableStateValidationResult {
Expand All @@ -207,9 +235,42 @@ func (v *mutableStateIDValidator) validateSignal(
return results
}

func (v *mutableStateIDValidator) validateID(
func (v *mutableStateValidator) validateID(
eventID int64,
lastEventID int64,
) bool {
return common.FirstEventID <= eventID && eventID <= lastEventID
}

func (v *mutableStateValidator) validateRetention(
executionInfo *persistencespb.WorkflowExecutionInfo,
executionState enums.WorkflowExecutionState,
) (*MutableStateValidationResult, error) {
if executionState != enums.WORKFLOW_EXECUTION_STATE_COMPLETED {
return nil, nil
}
// We don't use the close time here because some old workflows do not have the close time.
// It makes sense the workflow is finished and use the last update time.
finalUpdateTime := executionInfo.GetLastUpdateTime()
if finalUpdateTime == nil {
return nil, serviceerror.NewInternal("Cannot get last update time from a closed workflow")
}

ttl := time.Now().UTC().Sub(timestamp.TimeValue(finalUpdateTime))
ns, err := v.registry.GetNamespaceByID(namespace.ID(executionInfo.GetNamespaceId()))
if err != nil {
return nil, err
}
retention := ns.Retention()
if ttl > 0 && ttl > retention+v.executionDataDurationBuffer() {

return &MutableStateValidationResult{
failureType: mutableStateRetentionFailureType,
failureDetails: fmt.Sprintf("Workflow Data TTL %s passed retention %s",
ttl.String(),
retention.String(),
),
}, nil
}
return nil, nil
}
44 changes: 34 additions & 10 deletions service/worker/scanner/executions/scavenger.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,11 +25,15 @@
package executions

import (
"context"
"sync"
"sync/atomic"
"time"

"go.temporal.io/server/api/historyservice/v1"
"go.temporal.io/server/common/dynamicconfig"
"go.temporal.io/server/common/log/tag"
"go.temporal.io/server/common/namespace"
"go.temporal.io/server/common/quotas"

"go.temporal.io/server/common"
Expand All @@ -50,12 +54,17 @@ type (
Scavenger struct {
status int32
numHistoryShards int32

executionManager persistence.ExecutionManager
executor executor.Executor
rateLimiter quotas.RateLimiter
metrics metrics.Client
logger log.Logger
activityContext context.Context

executionManager persistence.ExecutionManager
registry namespace.Registry
historyClient historyservice.HistoryServiceClient
executor executor.Executor
rateLimiter quotas.RateLimiter
perShardQPS dynamicconfig.IntPropertyFn
executionDataDurationBuffer dynamicconfig.DurationPropertyFn
metrics metrics.Client
logger log.Logger

stopC chan struct{}
stopWG sync.WaitGroup
Expand All @@ -73,25 +82,36 @@ type (
// - either all executions are processed successfully (or)
// - Stop() method is called to stop the scavenger
func NewScavenger(
activityContext context.Context,
numHistoryShards int32,
perHostQPS dynamicconfig.IntPropertyFn,
perShardQPS dynamicconfig.IntPropertyFn,
executionDataDurationBuffer dynamicconfig.DurationPropertyFn,
executionManager persistence.ExecutionManager,
registry namespace.Registry,
historyClient historyservice.HistoryServiceClient,
metricsClient metrics.Client,
logger log.Logger,
) *Scavenger {
return &Scavenger{
activityContext: activityContext,
numHistoryShards: numHistoryShards,
executionManager: executionManager,
registry: registry,
historyClient: historyClient,
executor: executor.NewFixedSizePoolExecutor(
executorPoolSize,
executorMaxDeferredTasks,
metricsClient,
metrics.ExecutionsScavengerScope,
),
rateLimiter: quotas.NewDefaultOutgoingRateLimiter(
func() float64 { return float64(rateOverall) },
func() float64 { return float64(perHostQPS()) },
),
metrics: metricsClient,
logger: logger,
perShardQPS: perShardQPS,
executionDataDurationBuffer: executionDataDurationBuffer,
metrics: metricsClient,
logger: logger,

stopC: make(chan struct{}),
}
Expand Down Expand Up @@ -145,17 +165,21 @@ func (s *Scavenger) run() {

for shardID := int32(1); shardID <= s.numHistoryShards; shardID++ {
submitted := s.executor.Submit(newTask(
s.activityContext,
shardID,
s.executionManager,
s.registry,
s.historyClient,
s.metrics,
s.logger,
s,
quotas.NewMultiRateLimiter([]quotas.RateLimiter{
quotas.NewDefaultOutgoingRateLimiter(
func() float64 { return float64(ratePerShard) },
func() float64 { return float64(s.perShardQPS()) },
),
s.rateLimiter,
}),
s.executionDataDurationBuffer,
))
if !submitted {
s.logger.Error("unable to submit task to executor", tag.ShardID(shardID))
Expand Down
Loading

0 comments on commit ff47b2c

Please sign in to comment.