Skip to content

Commit

Permalink
Add random delay to ArchiveExecutionTask
Browse files Browse the repository at this point in the history
  • Loading branch information
MichaelSnowden committed Dec 5, 2022
1 parent 4907ff9 commit 8cc6fb8
Show file tree
Hide file tree
Showing 6 changed files with 195 additions and 96 deletions.
3 changes: 3 additions & 0 deletions common/backoff/jitter.go
Expand Up @@ -43,6 +43,9 @@ func JitInt64(input int64, coefficient float64) int64 {
if input == 0 {
return 0
}
if coefficient == 0 {
return input
}

base := int64(float64(input) * (1 - coefficient))
addon := rand.Int63n(2 * (input - base))
Expand Down
4 changes: 4 additions & 0 deletions common/dynamicconfig/constants.go
Expand Up @@ -550,6 +550,10 @@ const (
// ArchivalProcessorPollBackoffInterval is the poll backoff interval if task redispatcher's size exceeds limit for
// archivalQueueProcessor
ArchivalProcessorPollBackoffInterval = "history.archivalProcessorPollBackoffInterval"
// ArchivalProcessorArchiveDelay is the delay before archivalQueueProcessor starts to process archival tasks
ArchivalProcessorArchiveDelay = "history.archivalProcessorArchiveDelay"
// ArchivalProcessorArchiveDelayJitterCoefficient is the archival delay jitter coefficient
ArchivalProcessorArchiveDelayJitterCoefficient = "history.archivalProcessorArchiveDelayJitterCoefficient"

// ReplicatorTaskBatchSize is batch size for ReplicatorProcessor
ReplicatorTaskBatchSize = "history.replicatorTaskBatchSize"
Expand Down
15 changes: 10 additions & 5 deletions service/history/configs/config.go
Expand Up @@ -186,10 +186,11 @@ type Config struct {
NumParentClosePolicySystemWorkflows dynamicconfig.IntPropertyFn

// Archival settings
NumArchiveSystemWorkflows dynamicconfig.IntPropertyFn
ArchiveRequestRPS dynamicconfig.IntPropertyFn
ArchiveSignalTimeout dynamicconfig.DurationPropertyFn
DurableArchivalEnabled dynamicconfig.BoolPropertyFn
NumArchiveSystemWorkflows dynamicconfig.IntPropertyFn
ArchiveRequestRPS dynamicconfig.IntPropertyFn
ArchiveSignalTimeout dynamicconfig.DurationPropertyFn
DurableArchivalEnabled dynamicconfig.BoolPropertyFn
RandomArchiveExecutionDelayUpperBound dynamicconfig.DurationPropertyFn

// Size limit related settings
BlobSizeLimitError dynamicconfig.IntPropertyFnWithNamespaceFilter
Expand Down Expand Up @@ -303,6 +304,8 @@ type Config struct {
ArchivalProcessorMaxPollIntervalJitterCoefficient dynamicconfig.FloatPropertyFn
ArchivalProcessorUpdateAckInterval dynamicconfig.DurationPropertyFn
ArchivalProcessorUpdateAckIntervalJitterCoefficient dynamicconfig.FloatPropertyFn
ArchivalProcessorArchiveDelay dynamicconfig.DurationPropertyFn
ArchivalProcessorArchiveDelayJitterCoefficient dynamicconfig.FloatPropertyFn
}

const (
Expand Down Expand Up @@ -540,7 +543,9 @@ func NewConfig(dc *dynamicconfig.Collection, numberOfShards int32, isAdvancedVis
ArchivalProcessorUpdateAckInterval: dc.GetDurationProperty(dynamicconfig.ArchivalProcessorUpdateAckInterval, 30*time.Second),
ArchivalProcessorUpdateAckIntervalJitterCoefficient: dc.GetFloat64Property(dynamicconfig.
ArchivalProcessorUpdateAckIntervalJitterCoefficient, 0.15),
ArchivalProcessorPollBackoffInterval: dc.GetDurationProperty(dynamicconfig.ArchivalProcessorPollBackoffInterval, 5*time.Second),
ArchivalProcessorPollBackoffInterval: dc.GetDurationProperty(dynamicconfig.ArchivalProcessorPollBackoffInterval, 5*time.Second),
ArchivalProcessorArchiveDelay: dc.GetDurationProperty(dynamicconfig.ArchivalProcessorArchiveDelay, 5*time.Minute),
ArchivalProcessorArchiveDelayJitterCoefficient: dc.GetFloat64Property(dynamicconfig.ArchivalProcessorArchiveDelayJitterCoefficient, 1.0),
}

return cfg
Expand Down
2 changes: 1 addition & 1 deletion service/history/tasks/category.go
Expand Up @@ -94,7 +94,7 @@ var (

CategoryArchival = Category{
id: CategoryIDArchival,
cType: CategoryTypeImmediate,
cType: CategoryTypeScheduled,
name: CategoryNameArchival,
}
)
Expand Down
16 changes: 13 additions & 3 deletions service/history/workflow/task_generator.go
Expand Up @@ -198,10 +198,20 @@ func (r *TaskGeneratorImpl) GenerateWorkflowCloseTasks(
},
)
if r.config.DurableArchivalEnabled() {
delay := backoff.JitDuration(
r.config.ArchivalProcessorArchiveDelay(),
r.config.ArchivalProcessorArchiveDelayJitterCoefficient(),
) / 2
if delay > retention {
delay = retention
}

archiveTime := closeEvent.GetEventTime().Add(delay)
closeTasks = append(closeTasks, &tasks.ArchiveExecutionTask{
// TaskID and VisibilityTimestamp are set by the shard
WorkflowKey: r.mutableState.GetWorkflowKey(),
Version: currentVersion,
// TaskID is set by the shard
WorkflowKey: r.mutableState.GetWorkflowKey(),
VisibilityTimestamp: archiveTime,
Version: currentVersion,
})
} else {
closeTime := timestamp.TimeValue(closeEvent.GetEventTime())
Expand Down
251 changes: 164 additions & 87 deletions service/history/workflow/task_generator_test.go
Expand Up @@ -59,66 +59,128 @@ import (

"go.temporal.io/server/api/persistence/v1"
"go.temporal.io/server/common/definition"
"go.temporal.io/server/common/log"
"go.temporal.io/server/common/namespace"
"go.temporal.io/server/common/primitives/timestamp"
"go.temporal.io/server/service/history/configs"
"go.temporal.io/server/service/history/tasks"
"go.temporal.io/server/service/history/tests"
)

type testConfig struct {
Name string
ConfigFn func(config *testParams)
}

type testParams struct {
DurableArchivalEnabled bool
DeleteAfterClose bool
CloseEventTime time.Time
Retention time.Duration
Logger *log.MockLogger
ArchivalProcessorArchiveDelay time.Duration

ExpectCloseExecutionVisibilityTask bool
ExpectArchiveExecutionTask bool
ExpectDeleteHistoryEventTask bool
ExpectedArchiveExecutionTaskVisibilityTimestamp time.Time
}

func TestTaskGeneratorImpl_GenerateWorkflowCloseTasks(t *testing.T) {
for _, c := range []struct {
Name string
DurableArchivalEnabled bool
DeleteAfterClose bool
ExpectCloseExecutionVisibilityTask bool
ExpectArchiveExecutionTask bool
ExpectDeleteHistoryEventTask bool
}{
for _, c := range []testConfig{
{
Name: "delete after retention",
ConfigFn: func(p *testParams) {
p.ExpectCloseExecutionVisibilityTask = true
p.ExpectDeleteHistoryEventTask = true
},
},
{
Name: "Delete after retention",
DurableArchivalEnabled: false,
DeleteAfterClose: false,
Name: "use archival queue",
ConfigFn: func(p *testParams) {
p.DurableArchivalEnabled = true

ExpectCloseExecutionVisibilityTask: true,
ExpectDeleteHistoryEventTask: true,
ExpectArchiveExecutionTask: false,
p.ExpectCloseExecutionVisibilityTask = true
p.ExpectArchiveExecutionTask = true
},
},
{
Name: "Use archival queue",
DurableArchivalEnabled: true,
DeleteAfterClose: false,
Name: "delete after close",
ConfigFn: func(p *testParams) {
p.DurableArchivalEnabled = true

ExpectCloseExecutionVisibilityTask: true,
ExpectDeleteHistoryEventTask: false,
ExpectArchiveExecutionTask: true,
p.ExpectCloseExecutionVisibilityTask = true
p.ExpectArchiveExecutionTask = true
},
},
{
Name: "DeleteAfterClose",
DurableArchivalEnabled: false,
DeleteAfterClose: true,
Name: "delete after close ignores durable execution flag",
ConfigFn: func(p *testParams) {
p.DurableArchivalEnabled = true
p.DeleteAfterClose = true
},
},
{
Name: "delay is zero",
ConfigFn: func(p *testParams) {
p.DurableArchivalEnabled = true
p.CloseEventTime = time.Unix(0, 0)
p.Retention = 24 * time.Hour
p.ArchivalProcessorArchiveDelay = 0

ExpectCloseExecutionVisibilityTask: false,
ExpectDeleteHistoryEventTask: false,
ExpectArchiveExecutionTask: false,
p.ExpectedArchiveExecutionTaskVisibilityTimestamp = time.Unix(0, 0)
p.ExpectCloseExecutionVisibilityTask = true
p.ExpectArchiveExecutionTask = true
},
},
{
Name: "DeleteAfterClose ignores durable execution flag",
DurableArchivalEnabled: true,
DeleteAfterClose: true,
Name: "delay exceeds retention",
ConfigFn: func(p *testParams) {
p.DurableArchivalEnabled = true
p.CloseEventTime = time.Unix(0, 0)
p.Retention = 24 * time.Hour
p.ArchivalProcessorArchiveDelay = 48*time.Hour + time.Second

ExpectCloseExecutionVisibilityTask: false,
ExpectDeleteHistoryEventTask: false,
ExpectArchiveExecutionTask: false,
p.ExpectedArchiveExecutionTaskVisibilityTimestamp = time.Unix(0, 0).Add(24 * time.Hour)
p.ExpectCloseExecutionVisibilityTask = true
p.ExpectArchiveExecutionTask = true
},
},
{
Name: "delay is less than retention",
ConfigFn: func(p *testParams) {
p.DurableArchivalEnabled = true
p.CloseEventTime = time.Unix(0, 0)
p.Retention = 24 * time.Hour
p.ArchivalProcessorArchiveDelay = 12 * time.Hour

p.ExpectedArchiveExecutionTaskVisibilityTimestamp = time.Unix(0, 0).Add(6 * time.Hour)
p.ExpectCloseExecutionVisibilityTask = true
p.ExpectArchiveExecutionTask = true
},
},
} {
c := c
t.Run(c.Name, func(t *testing.T) {
t.Parallel()
// t.Parallel()
now := time.Unix(0, 0).UTC()
ctrl := gomock.NewController(t)
mockLogger := log.NewMockLogger(ctrl)
p := testParams{
DurableArchivalEnabled: false,
DeleteAfterClose: false,
CloseEventTime: now,
Retention: time.Hour * 24 * 7,
Logger: mockLogger,

ExpectCloseExecutionVisibilityTask: false,
ExpectArchiveExecutionTask: false,
ExpectDeleteHistoryEventTask: false,
ExpectedArchiveExecutionTaskVisibilityTimestamp: now,
}
c.ConfigFn(&p)
namespaceRegistry := namespace.NewMockRegistry(ctrl)
retention := 24 * time.Hour
namespaceEntry := tests.GlobalNamespaceEntry.Clone(namespace.WithRetention(&retention))
namespaceEntry := tests.GlobalNamespaceEntry.Clone(namespace.WithRetention(&p.Retention))
namespaceRegistry.EXPECT().GetNamespaceID(gomock.Any()).Return(namespaceEntry.ID(), nil).AnyTimes()
namespaceRegistry.EXPECT().GetNamespaceByID(namespaceEntry.ID()).Return(namespaceEntry, nil).AnyTimes()

Expand All @@ -131,71 +193,86 @@ func TestTaskGeneratorImpl_GenerateWorkflowCloseTasks(t *testing.T) {
namespaceEntry.ID().String(), tests.WorkflowID, tests.RunID,
)).AnyTimes()
mutableState.EXPECT().GetCurrentBranchToken().Return(nil, nil)
taskGenerator := NewTaskGenerator(namespaceRegistry, mutableState, &configs.Config{
retentionTimerDelay := time.Second
cfg := &configs.Config{
DurableArchivalEnabled: func() bool {
return c.DurableArchivalEnabled
return p.DurableArchivalEnabled
},
RetentionTimerJitterDuration: func() time.Duration {
return time.Second
return retentionTimerDelay
},
})

ArchivalProcessorArchiveDelay: func() time.Duration {
return p.ArchivalProcessorArchiveDelay
},
ArchivalProcessorArchiveDelayJitterCoefficient: func() float64 {
return 0
},
}
closeTime := time.Unix(0, 0)

var allTasks []tasks.Task
mutableState.EXPECT().AddTasks(gomock.Any()).Do(func(ts ...tasks.Task) {
var (
closeExecutionTask *tasks.CloseExecutionTask
deleteHistoryEventTask *tasks.DeleteHistoryEventTask
closeExecutionVisibilityTask *tasks.CloseExecutionVisibilityTask
archiveExecutionTask *tasks.ArchiveExecutionTask
)
for _, task := range ts {
switch t := task.(type) {
case *tasks.CloseExecutionTask:
closeExecutionTask = t
case *tasks.DeleteHistoryEventTask:
deleteHistoryEventTask = t
case *tasks.CloseExecutionVisibilityTask:
closeExecutionVisibilityTask = t
case *tasks.ArchiveExecutionTask:
archiveExecutionTask = t
}
}
require.NotNil(t, closeExecutionTask)
assert.Equal(t, c.DeleteAfterClose, closeExecutionTask.DeleteAfterClose)

if c.ExpectCloseExecutionVisibilityTask {
assert.NotNil(t, closeExecutionVisibilityTask)
} else {
assert.Nil(t, closeExecutionVisibilityTask)
}
if c.ExpectArchiveExecutionTask {
require.NotNil(t, archiveExecutionTask)
assert.Equal(t, archiveExecutionTask.NamespaceID, namespaceEntry.ID().String())
assert.Equal(t, archiveExecutionTask.WorkflowID, tests.WorkflowID)
assert.Equal(t, archiveExecutionTask.RunID, tests.RunID)
} else {
assert.Nil(t, archiveExecutionTask)
}
if c.ExpectDeleteHistoryEventTask {
require.NotNil(t, deleteHistoryEventTask)
assert.Equal(t, deleteHistoryEventTask.NamespaceID, namespaceEntry.ID().String())
assert.Equal(t, deleteHistoryEventTask.WorkflowID, tests.WorkflowID)
assert.Equal(t, deleteHistoryEventTask.RunID, tests.RunID)
assert.True(t, deleteHistoryEventTask.VisibilityTimestamp.After(closeTime.Add(retention)))
assert.True(t, deleteHistoryEventTask.VisibilityTimestamp.Before(closeTime.Add(retention).Add(time.Second*2)))
} else {
assert.Nil(t, deleteHistoryEventTask)
}
allTasks = append(allTasks, ts...)
})

taskGenerator := NewTaskGenerator(namespaceRegistry, mutableState, cfg)
err := taskGenerator.GenerateWorkflowCloseTasks(&historypb.HistoryEvent{
Attributes: &historypb.HistoryEvent_WorkflowExecutionCompletedEventAttributes{
WorkflowExecutionCompletedEventAttributes: &historypb.WorkflowExecutionCompletedEventAttributes{},
},
EventTime: timestamp.TimePtr(closeTime),
}, c.DeleteAfterClose)
EventTime: timestamp.TimePtr(p.CloseEventTime),
}, p.DeleteAfterClose)
require.NoError(t, err)

var (
closeExecutionTask *tasks.CloseExecutionTask
deleteHistoryEventTask *tasks.DeleteHistoryEventTask
closeExecutionVisibilityTask *tasks.CloseExecutionVisibilityTask
archiveExecutionTask *tasks.ArchiveExecutionTask
)
for _, task := range allTasks {
switch t := task.(type) {
case *tasks.CloseExecutionTask:
closeExecutionTask = t
case *tasks.DeleteHistoryEventTask:
deleteHistoryEventTask = t
case *tasks.CloseExecutionVisibilityTask:
closeExecutionVisibilityTask = t
case *tasks.ArchiveExecutionTask:
archiveExecutionTask = t
}
}
require.NotNil(t, closeExecutionTask)
assert.Equal(t, p.DeleteAfterClose, closeExecutionTask.DeleteAfterClose)

if p.ExpectCloseExecutionVisibilityTask {
assert.NotNil(t, closeExecutionVisibilityTask)
} else {
assert.Nil(t, closeExecutionVisibilityTask)
}
if p.ExpectArchiveExecutionTask {
require.NotNil(t, archiveExecutionTask)
assert.Equal(t, archiveExecutionTask.NamespaceID, namespaceEntry.ID().String())
assert.Equal(t, archiveExecutionTask.WorkflowID, tests.WorkflowID)
assert.Equal(t, archiveExecutionTask.RunID, tests.RunID)
assert.Equal(
t,
p.ExpectedArchiveExecutionTaskVisibilityTimestamp,
archiveExecutionTask.VisibilityTimestamp,
)
} else {
assert.Nil(t, archiveExecutionTask)
}
if p.ExpectDeleteHistoryEventTask {
require.NotNil(t, deleteHistoryEventTask)
assert.Equal(t, deleteHistoryEventTask.NamespaceID, namespaceEntry.ID().String())
assert.Equal(t, deleteHistoryEventTask.WorkflowID, tests.WorkflowID)
assert.Equal(t, deleteHistoryEventTask.RunID, tests.RunID)
assert.GreaterOrEqual(t, deleteHistoryEventTask.VisibilityTimestamp, closeTime.Add(p.Retention))
assert.LessOrEqual(t, deleteHistoryEventTask.VisibilityTimestamp,
closeTime.Add(p.Retention).Add(retentionTimerDelay*2))
} else {
assert.Nil(t, deleteHistoryEventTask)
}
})
}
}

0 comments on commit 8cc6fb8

Please sign in to comment.