Skip to content

Commit

Permalink
Add an archival queue factory
Browse files Browse the repository at this point in the history
  • Loading branch information
MichaelSnowden committed Dec 9, 2022
1 parent 84a8be9 commit d082323
Show file tree
Hide file tree
Showing 5 changed files with 283 additions and 10 deletions.
5 changes: 2 additions & 3 deletions common/dynamicconfig/constants.go
Expand Up @@ -534,9 +534,6 @@ const (
// ArchivalProcessorSchedulerWorkerCount is the number of workers in the host level task scheduler for
// archivalQueueProcessor
ArchivalProcessorSchedulerWorkerCount = "history.archivalProcessorSchedulerWorkerCount"
// ArchivalProcessorSchedulerRoundRobinWeights is the priority round robin weights by archival task scheduler for
// all namespaces
ArchivalProcessorSchedulerRoundRobinWeights = "history.archivalProcessorSchedulerRoundRobinWeights"
// ArchivalProcessorMaxPollInterval max poll interval for archivalQueueProcessor
ArchivalProcessorMaxPollInterval = "history.archivalProcessorMaxPollInterval"
// ArchivalProcessorMaxPollIntervalJitterCoefficient is the max poll interval jitter coefficient
Expand All @@ -550,6 +547,8 @@ const (
ArchivalProcessorPollBackoffInterval = "history.archivalProcessorPollBackoffInterval"
// ArchivalProcessorArchiveDelay is the delay before archivalQueueProcessor starts to process archival tasks
ArchivalProcessorArchiveDelay = "history.archivalProcessorArchiveDelay"
// ArchivalProcessorRetryWarningLimit is the number of times an archival task may be retried before we log a warning
ArchivalProcessorRetryWarningLimit = "history.archivalProcessorRetryLimitWarning"

// ReplicatorTaskBatchSize is batch size for ReplicatorProcessor
ReplicatorTaskBatchSize = "history.replicatorTaskBatchSize"
Expand Down
184 changes: 184 additions & 0 deletions service/history/archival_queue_factory.go
@@ -0,0 +1,184 @@
// The MIT License
//
// Copyright (c) 2020 Temporal Technologies Inc. All rights reserved.
//
// Copyright (c) 2020 Uber Technologies, Inc.
//
// Permission is hereby granted, free of charge, to any person obtaining a copy
// of this software and associated documentation files (the "Software"), to deal
// in the Software without restriction, including without limitation the rights
// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
// copies of the Software, and to permit persons to whom the Software is
// furnished to do so, subject to the following conditions:
//
// The above copyright notice and this permission notice shall be included in
// all copies or substantial portions of the Software.
//
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
// THE SOFTWARE.

package history

import (
"go.uber.org/fx"

"go.temporal.io/server/common/log"
"go.temporal.io/server/common/log/tag"
"go.temporal.io/server/common/metrics"
ctasks "go.temporal.io/server/common/tasks"
"go.temporal.io/server/service/history/archival"
"go.temporal.io/server/service/history/configs"
"go.temporal.io/server/service/history/queues"
"go.temporal.io/server/service/history/shard"
"go.temporal.io/server/service/history/tasks"
"go.temporal.io/server/service/history/workflow"
wcache "go.temporal.io/server/service/history/workflow/cache"
)

const (
// archivalQueuePersistenceMaxRPSRatio is the hard-coded ratio of archival queue persistence max RPS to the total
// persistence max RPS.
// In this case, the archival queue may not send requests at a rate higher than 15% of the global persistence max
// RPS.
archivalQueuePersistenceMaxRPSRatio = 0.15
)

var (
// ArchivalTaskPriorities is the map of task priority to weight for the archival queue.
// The archival queue only uses the low task priority, so we only define a weight for that priority.
ArchivalTaskPriorities = configs.ConvertWeightsToDynamicConfigValue(map[ctasks.Priority]int{
ctasks.PriorityLow: 10,
})
)

type (
// ArchivalQueueFactoryParams contains the necessary params to create a new archival queue factory.
ArchivalQueueFactoryParams struct {
// fx.In allows fx to construct this object without an explicitly defined constructor.
fx.In

// QueueFactoryBaseParams contains common params for all queue factories.
QueueFactoryBaseParams
// Archiver is the archival client used to archive history events and visibility records.
Archiver archival.Archiver
// RelocatableAttributesFetcher is the client used to fetch the memo and search attributes of a workflow.
RelocatableAttributesFetcher workflow.RelocatableAttributesFetcher
}

// archivalQueueFactory implements QueueFactory for the archival queue.
archivalQueueFactory struct {
QueueFactoryBase
ArchivalQueueFactoryParams
}
)

// NewArchivalQueueFactory creates a new QueueFactory to construct archival queues.
func NewArchivalQueueFactory(
params ArchivalQueueFactoryParams,
) QueueFactory {
hostScheduler := newScheduler(params)
queueFactoryBase := newQueueFactoryBase(params, hostScheduler)
return &archivalQueueFactory{
ArchivalQueueFactoryParams: params,
QueueFactoryBase: queueFactoryBase,
}
}

// newScheduler creates a new task scheduler for tasks on the archival queue.
func newScheduler(params ArchivalQueueFactoryParams) queues.Scheduler {
return queues.NewPriorityScheduler(
queues.PrioritySchedulerOptions{
WorkerCount: params.Config.ArchivalProcessorSchedulerWorkerCount,
EnableRateLimiter: params.Config.TaskSchedulerEnableRateLimiter,
MaxDispatchThrottleDuration: HostSchedulerMaxDispatchThrottleDuration,
Weight: func() map[string]any {
return ArchivalTaskPriorities
},
},
params.SchedulerRateLimiter,
params.TimeSource,
params.Logger,
)
}

// newQueueFactoryBase creates a new QueueFactoryBase for the archival queue, which contains common configurations
// like the task scheduler, task priority assigner, and rate limiters.
func newQueueFactoryBase(params ArchivalQueueFactoryParams, hostScheduler queues.Scheduler) QueueFactoryBase {
return QueueFactoryBase{
HostScheduler: hostScheduler,
HostPriorityAssigner: queues.NewPriorityAssigner(),
HostRateLimiter: NewQueueHostRateLimiter(
params.Config.ArchivalProcessorMaxPollHostRPS,
params.Config.PersistenceMaxQPS,
archivalQueuePersistenceMaxRPSRatio,
),
HostReaderRateLimiter: queues.NewReaderPriorityRateLimiter(
NewHostRateLimiterRateFn(
params.Config.ArchivalProcessorMaxPollHostRPS,
params.Config.PersistenceMaxQPS,
archivalQueuePersistenceMaxRPSRatio,
),
params.Config.QueueMaxReaderCount(),
),
}
}

// CreateQueue creates a new archival queue for the given shard.
func (f *archivalQueueFactory) CreateQueue(
shard shard.Context,
workflowCache wcache.Cache,
) queues.Queue {
executor := f.newArchivalTaskExecutor(shard, workflowCache)
return f.newScheduledQueue(shard, executor)
}

// newArchivalTaskExecutor creates a new archival task executor for the given shard.
func (f *archivalQueueFactory) newArchivalTaskExecutor(shard shard.Context, workflowCache wcache.Cache) queues.Executor {
return NewArchivalQueueTaskExecutor(
f.Archiver,
shard,
workflowCache,
f.RelocatableAttributesFetcher,
f.MetricsHandler,
f.Logger,
)
}

// newScheduledQueue creates a new scheduled queue for the given shard with archival-specific configurations.
func (f *archivalQueueFactory) newScheduledQueue(shard shard.Context, executor queues.Executor) queues.Queue {
logger := log.With(shard.GetLogger(), tag.ComponentArchivalQueue)
return queues.NewScheduledQueue(
shard,
tasks.CategoryArchival,
f.HostScheduler,
f.HostPriorityAssigner,
executor,
&queues.Options{
ReaderOptions: queues.ReaderOptions{
BatchSize: f.Config.ArchivalTaskBatchSize,
MaxPendingTasksCount: f.Config.QueuePendingTaskMaxCount,
PollBackoffInterval: f.Config.ArchivalProcessorPollBackoffInterval,
},
MonitorOptions: queues.MonitorOptions{
PendingTasksCriticalCount: f.Config.QueuePendingTaskCriticalCount,
ReaderStuckCriticalAttempts: f.Config.QueueReaderStuckCriticalAttempts,
SliceCountCriticalThreshold: f.Config.QueueCriticalSlicesCount,
},
MaxPollRPS: f.Config.ArchivalProcessorMaxPollRPS,
MaxPollInterval: f.Config.ArchivalProcessorMaxPollInterval,
MaxPollIntervalJitterCoefficient: f.Config.ArchivalProcessorMaxPollIntervalJitterCoefficient,
CheckpointInterval: f.Config.ArchivalProcessorUpdateAckInterval,
CheckpointIntervalJitterCoefficient: f.Config.ArchivalProcessorUpdateAckIntervalJitterCoefficient,
MaxReaderCount: f.Config.QueueMaxReaderCount,
TaskMaxRetryCount: f.Config.ArchivalProcessorRetryWarningLimit,
},
f.HostReaderRateLimiter,
logger,
f.MetricsHandler.WithTags(metrics.OperationTag(metrics.OperationArchivalQueueProcessorScope)),
)
}
75 changes: 75 additions & 0 deletions service/history/archival_queue_factory_test.go
@@ -0,0 +1,75 @@
// The MIT License
//
// Copyright (c) 2020 Temporal Technologies Inc. All rights reserved.
//
// Copyright (c) 2020 Uber Technologies, Inc.
//
// Permission is hereby granted, free of charge, to any person obtaining a copy
// of this software and associated documentation files (the "Software"), to deal
// in the Software without restriction, including without limitation the rights
// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
// copies of the Software, and to permit persons to whom the Software is
// furnished to do so, subject to the following conditions:
//
// The above copyright notice and this permission notice shall be included in
// all copies or substantial portions of the Software.
//
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
// THE SOFTWARE.

package history

import (
"testing"

"github.com/golang/mock/gomock"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"

"go.temporal.io/server/api/persistence/v1"
"go.temporal.io/server/common/log"
"go.temporal.io/server/common/metrics"
"go.temporal.io/server/common/namespace"
"go.temporal.io/server/common/primitives/timestamp"
"go.temporal.io/server/service/history/shard"
"go.temporal.io/server/service/history/tasks"
"go.temporal.io/server/service/history/tests"
)

func TestArchivalQueueFactory(t *testing.T) {
ctrl := gomock.NewController(t)
metricsHandler := metrics.NewMockMetricsHandler(ctrl)
metricsHandler.EXPECT().WithTags(gomock.Any()).Do(func(tags ...metrics.Tag) metrics.MetricsHandler {
require.Len(t, tags, 1)
assert.Equal(t, metrics.OperationTagName, tags[0].Key())
assert.Equal(t, "ArchivalQueueProcessor", tags[0].Value())
return metricsHandler
})
shardContext := shard.NewMockContext(ctrl)
shardContext.EXPECT().GetLogger().Return(log.NewNoopLogger())
shardContext.EXPECT().GetQueueState(tasks.CategoryArchival).Return(&persistence.QueueState{
ReaderStates: nil,
ExclusiveReaderHighWatermark: &persistence.TaskKey{
FireTime: timestamp.TimeNowPtrUtc(),
},
}, true)
shardContext.EXPECT().GetTimeSource().Return(namespace.NewMockClock(ctrl)).AnyTimes()

queueFactory := NewArchivalQueueFactory(ArchivalQueueFactoryParams{
QueueFactoryBaseParams: QueueFactoryBaseParams{
Config: tests.NewDynamicConfig(),
TimeSource: namespace.NewMockClock(ctrl),
MetricsHandler: metricsHandler,
Logger: log.NewNoopLogger(),
},
})
queue := queueFactory.CreateQueue(shardContext, nil)

require.NotNil(t, queue)
assert.Equal(t, tasks.CategoryArchival, queue.Category())
}
14 changes: 7 additions & 7 deletions service/history/configs/config.go
Expand Up @@ -294,7 +294,6 @@ type Config struct {

// ArchivalQueueProcessor settings
ArchivalProcessorSchedulerWorkerCount dynamicconfig.IntPropertyFn
ArchivalProcessorSchedulerRoundRobinWeights dynamicconfig.MapPropertyFnWithNamespaceFilter
ArchivalProcessorMaxPollHostRPS dynamicconfig.IntPropertyFn
ArchivalTaskBatchSize dynamicconfig.IntPropertyFn
ArchivalProcessorPollBackoffInterval dynamicconfig.DurationPropertyFn
Expand All @@ -304,6 +303,7 @@ type Config struct {
ArchivalProcessorUpdateAckInterval dynamicconfig.DurationPropertyFn
ArchivalProcessorUpdateAckIntervalJitterCoefficient dynamicconfig.FloatPropertyFn
ArchivalProcessorArchiveDelay dynamicconfig.DurationPropertyFn
ArchivalProcessorRetryWarningLimit dynamicconfig.IntPropertyFn
}

const (
Expand Down Expand Up @@ -530,19 +530,19 @@ func NewConfig(dc *dynamicconfig.Collection, numberOfShards int32, isAdvancedVis
NamespaceCacheRefreshInterval: dc.GetDurationProperty(dynamicconfig.NamespaceCacheRefreshInterval, 10*time.Second),

// Archival related
ArchivalTaskBatchSize: dc.GetIntProperty(dynamicconfig.ArchivalTaskBatchSize, 100),
ArchivalProcessorMaxPollRPS: dc.GetIntProperty(dynamicconfig.ArchivalProcessorMaxPollRPS, 20),
ArchivalProcessorMaxPollHostRPS: dc.GetIntProperty(dynamicconfig.ArchivalProcessorMaxPollHostRPS, 0),
ArchivalProcessorSchedulerWorkerCount: dc.GetIntProperty(dynamicconfig.ArchivalProcessorSchedulerWorkerCount, 512),
ArchivalProcessorSchedulerRoundRobinWeights: dc.GetMapPropertyFnWithNamespaceFilter(dynamicconfig.ArchivalProcessorSchedulerRoundRobinWeights, ConvertWeightsToDynamicConfigValue(DefaultActiveTaskPriorityWeight)),
ArchivalProcessorMaxPollInterval: dc.GetDurationProperty(dynamicconfig.ArchivalProcessorMaxPollInterval, 1*time.Minute),
ArchivalTaskBatchSize: dc.GetIntProperty(dynamicconfig.ArchivalTaskBatchSize, 100),
ArchivalProcessorMaxPollRPS: dc.GetIntProperty(dynamicconfig.ArchivalProcessorMaxPollRPS, 20),
ArchivalProcessorMaxPollHostRPS: dc.GetIntProperty(dynamicconfig.ArchivalProcessorMaxPollHostRPS, 0),
ArchivalProcessorSchedulerWorkerCount: dc.GetIntProperty(dynamicconfig.ArchivalProcessorSchedulerWorkerCount, 512),
ArchivalProcessorMaxPollInterval: dc.GetDurationProperty(dynamicconfig.ArchivalProcessorMaxPollInterval, 5*time.Minute),
ArchivalProcessorMaxPollIntervalJitterCoefficient: dc.GetFloat64Property(dynamicconfig.
ArchivalProcessorMaxPollIntervalJitterCoefficient, 0.15),
ArchivalProcessorUpdateAckInterval: dc.GetDurationProperty(dynamicconfig.ArchivalProcessorUpdateAckInterval, 30*time.Second),
ArchivalProcessorUpdateAckIntervalJitterCoefficient: dc.GetFloat64Property(dynamicconfig.
ArchivalProcessorUpdateAckIntervalJitterCoefficient, 0.15),
ArchivalProcessorPollBackoffInterval: dc.GetDurationProperty(dynamicconfig.ArchivalProcessorPollBackoffInterval, 5*time.Second),
ArchivalProcessorArchiveDelay: dc.GetDurationProperty(dynamicconfig.ArchivalProcessorArchiveDelay, 5*time.Minute),
ArchivalProcessorRetryWarningLimit: dc.GetIntProperty(dynamicconfig.ArchivalProcessorRetryWarningLimit, 100),
}

return cfg
Expand Down
15 changes: 15 additions & 0 deletions service/history/queues/priority_assigner_test.go
Expand Up @@ -83,3 +83,18 @@ func (s *priorityAssignerSuite) TestAssign_HighPriorityTaskTypes() {

s.Equal(tasks.PriorityHigh, s.priorityAssigner.Assign(mockExecutable))
}

func (s *priorityAssignerSuite) TestAssign_LowPriorityTaskTypes() {
for _, taskType := range []enumsspb.TaskType{
enumsspb.TASK_TYPE_DELETE_HISTORY_EVENT,
enumsspb.TASK_TYPE_TRANSFER_DELETE_EXECUTION,
enumsspb.TASK_TYPE_VISIBILITY_DELETE_EXECUTION,
enumsspb.TASK_TYPE_ARCHIVAL_ARCHIVE_EXECUTION,
enumsspb.TASK_TYPE_UNSPECIFIED,
} {
mockExecutable := NewMockExecutable(s.controller)
mockExecutable.EXPECT().GetType().Return(taskType).Times(1)

s.Equal(tasks.PriorityLow, s.priorityAssigner.Assign(mockExecutable))
}
}

0 comments on commit d082323

Please sign in to comment.