Skip to content

Commit

Permalink
Add random delay to ArchiveExecutionTask
Browse files Browse the repository at this point in the history
  • Loading branch information
MichaelSnowden committed Nov 7, 2022
1 parent b300010 commit 75c70ba
Show file tree
Hide file tree
Showing 5 changed files with 209 additions and 61 deletions.
9 changes: 5 additions & 4 deletions service/history/configs/config.go
Expand Up @@ -181,10 +181,11 @@ type Config struct {
NumParentClosePolicySystemWorkflows dynamicconfig.IntPropertyFn

// Archival settings
NumArchiveSystemWorkflows dynamicconfig.IntPropertyFn
ArchiveRequestRPS dynamicconfig.IntPropertyFn
ArchiveSignalTimeout dynamicconfig.DurationPropertyFn
DurableArchivalEnabled dynamicconfig.BoolPropertyFn
NumArchiveSystemWorkflows dynamicconfig.IntPropertyFn
ArchiveRequestRPS dynamicconfig.IntPropertyFn
ArchiveSignalTimeout dynamicconfig.DurationPropertyFn
DurableArchivalEnabled dynamicconfig.BoolPropertyFn
RandomArchiveExecutionDelayUpperBound dynamicconfig.DurationPropertyFn

// Size limit related settings
BlobSizeLimitError dynamicconfig.IntPropertyFnWithNamespaceFilter
Expand Down
2 changes: 1 addition & 1 deletion service/history/tasks/category.go
Expand Up @@ -94,7 +94,7 @@ var (

CategoryArchival = Category{
id: CategoryIDArchival,
cType: CategoryTypeImmediate,
cType: CategoryTypeScheduled,
name: CategoryNameArchival,
}
)
Expand Down
52 changes: 49 additions & 3 deletions service/history/workflow/task_generator.go
Expand Up @@ -28,13 +28,16 @@ package workflow

import (
"fmt"
"math/rand"
"time"

enumspb "go.temporal.io/api/enums/v1"
historypb "go.temporal.io/api/history/v1"
"go.temporal.io/api/serviceerror"

enumsspb "go.temporal.io/server/api/enums/v1"
"go.temporal.io/server/common/log"
"go.temporal.io/server/common/log/tag"
"go.temporal.io/server/common/namespace"
"go.temporal.io/server/common/persistence/versionhistory"
"go.temporal.io/server/common/primitives/timestamp"
Expand Down Expand Up @@ -101,6 +104,8 @@ type (
namespaceRegistry namespace.Registry
mutableState MutableState
config *configs.Config
randSource rand.Source
logger log.Logger
}
)

Expand All @@ -112,11 +117,15 @@ func NewTaskGenerator(
namespaceRegistry namespace.Registry,
mutableState MutableState,
config *configs.Config,
randSource rand.Source,
logger log.Logger,
) *TaskGeneratorImpl {
return &TaskGeneratorImpl{
namespaceRegistry: namespaceRegistry,
mutableState: mutableState,
config: config,
randSource: randSource,
logger: logger,
}
}

Expand All @@ -141,6 +150,11 @@ func (r *TaskGeneratorImpl) GenerateWorkflowStartTasks(
return nil
}

// The smallest number that the maximum archive execution delay's upper bound can be set to.
// This is to prevent users from setting retention to their namespace to 0 or the dynamic config flag for the archive
// execution delay to 0 because that could cause all of our archival tasks to arrive at the same time.
const minRandomArchiveExecutionDelayUpperBound = time.Hour

func (r *TaskGeneratorImpl) GenerateWorkflowCloseTasks(
closeEvent *historypb.HistoryEvent,
deleteAfterClose bool,
Expand Down Expand Up @@ -197,10 +211,14 @@ func (r *TaskGeneratorImpl) GenerateWorkflowCloseTasks(
},
)
if r.config.DurableArchivalEnabled() {
delay := r.getArchivalTaskDelay(namespaceEntry, retention)

archiveTime := closeEvent.GetEventTime().Add(delay)
closeTasks = append(closeTasks, &tasks.ArchiveExecutionTask{
// TaskID and VisibilityTimestamp are set by the shard
WorkflowKey: r.mutableState.GetWorkflowKey(),
Version: currentVersion,
// TaskID is set by the shard
WorkflowKey: r.mutableState.GetWorkflowKey(),
VisibilityTimestamp: archiveTime,
Version: currentVersion,
})
} else {
closeTime := timestamp.TimeValue(closeEvent.GetEventTime())
Expand All @@ -218,6 +236,34 @@ func (r *TaskGeneratorImpl) GenerateWorkflowCloseTasks(
return nil
}

func (r *TaskGeneratorImpl) getArchivalTaskDelay(namespaceEntry *namespace.Namespace, retention time.Duration) time.Duration {
delayMaxFromDynamicConfig := r.config.RandomArchiveExecutionDelayUpperBound()
maxDelay := delayMaxFromDynamicConfig
logger := log.With(
r.logger,
tag.WorkflowNamespace(namespaceEntry.Name().String()),
tag.NewStringTag("RandomArchiveExecutionDelayUpperBound", maxDelay.String()),
tag.NewStringTag("retention", retention.String()),
)
if maxDelay > retention {
logger.Warn(
"RandomArchiveExecutionDelayUpperBound dynamic config exceeds the namespace's retention, so it is ignored",
)
maxDelay = retention
}
if maxDelay < minRandomArchiveExecutionDelayUpperBound {
logger.Warn(
fmt.Sprintf("Max archival execution delay %s is too low, so it has been set to %s instead"+
" to avoid archival tasks all ariving at the same time.",
maxDelay, minRandomArchiveExecutionDelayUpperBound),
)
maxDelay = minRandomArchiveExecutionDelayUpperBound
}
randInt63 := r.randSource.Int63()
delay := time.Duration(randInt63 % maxDelay.Nanoseconds())
return delay
}

func (r *TaskGeneratorImpl) GenerateDeleteExecutionTask() (*tasks.DeleteExecutionTask, error) {
return &tasks.DeleteExecutionTask{
// TaskID, VisibilityTimestamp is set by shard
Expand Down
6 changes: 6 additions & 0 deletions service/history/workflow/task_generator_provider.go
Expand Up @@ -25,6 +25,9 @@
package workflow

import (
"math/rand"
"time"

"go.temporal.io/server/service/history/shard"
)

Expand Down Expand Up @@ -52,9 +55,12 @@ func (p *taskGeneratorProviderImpl) NewTaskGenerator(
shard shard.Context,
mutableState MutableState,
) TaskGenerator {
randSource := rand.NewSource(time.Now().Unix())
return NewTaskGenerator(
shard.GetNamespaceRegistry(),
mutableState,
shard.GetConfig(),
randSource,
shard.GetLogger(),
)
}

0 comments on commit 75c70ba

Please sign in to comment.