Skip to content

Commit

Permalink
Add an archival queue factory
Browse files Browse the repository at this point in the history
  • Loading branch information
MichaelSnowden committed Dec 1, 2022
1 parent 0457a6d commit dd30d67
Show file tree
Hide file tree
Showing 5 changed files with 229 additions and 10 deletions.
5 changes: 2 additions & 3 deletions common/dynamicconfig/constants.go
Expand Up @@ -534,9 +534,6 @@ const (
// ArchivalProcessorSchedulerWorkerCount is the number of workers in the host level task scheduler for
// archivalQueueProcessor
ArchivalProcessorSchedulerWorkerCount = "history.archivalProcessorSchedulerWorkerCount"
// ArchivalProcessorSchedulerRoundRobinWeights is the priority round robin weights by archival task scheduler for
// all namespaces
ArchivalProcessorSchedulerRoundRobinWeights = "history.archivalProcessorSchedulerRoundRobinWeights"
// ArchivalProcessorMaxPollInterval max poll interval for archivalQueueProcessor
ArchivalProcessorMaxPollInterval = "history.archivalProcessorMaxPollInterval"
// ArchivalProcessorMaxPollIntervalJitterCoefficient is the max poll interval jitter coefficient
Expand All @@ -548,6 +545,8 @@ const (
// ArchivalProcessorPollBackoffInterval is the poll backoff interval if task redispatcher's size exceeds limit for
// archivalQueueProcessor
ArchivalProcessorPollBackoffInterval = "history.archivalProcessorPollBackoffInterval"
// ArchivalProcessorRetryWarningLimit is the number of times an archival task may be retried before we log a warning
ArchivalProcessorRetryWarningLimit = "history.archivalProcessorRetryLimitWarning"

// ReplicatorTaskBatchSize is batch size for ReplicatorProcessor
ReplicatorTaskBatchSize = "history.replicatorTaskBatchSize"
Expand Down
130 changes: 130 additions & 0 deletions service/history/archivalQueueFactory.go
@@ -0,0 +1,130 @@
// The MIT License
//
// Copyright (c) 2020 Temporal Technologies Inc. All rights reserved.
//
// Copyright (c) 2020 Uber Technologies, Inc.
//
// Permission is hereby granted, free of charge, to any person obtaining a copy
// of this software and associated documentation files (the "Software"), to deal
// in the Software without restriction, including without limitation the rights
// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
// copies of the Software, and to permit persons to whom the Software is
// furnished to do so, subject to the following conditions:
//
// The above copyright notice and this permission notice shall be included in
// all copies or substantial portions of the Software.
//
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
// THE SOFTWARE.

package history

import (
"go.uber.org/fx"

"go.temporal.io/server/common/log"
"go.temporal.io/server/common/log/tag"
"go.temporal.io/server/common/metrics"
"go.temporal.io/server/service/history/archival"
"go.temporal.io/server/service/history/queues"
"go.temporal.io/server/service/history/shard"
"go.temporal.io/server/service/history/tasks"
"go.temporal.io/server/service/history/workflow"
)

const (
archivalQueuePersistenceMaxRPSRatio = 0.15
)

type (
archivalQueueFactoryParams struct {
fx.In

QueueFactoryBaseParams
}

archivalQueueFactory struct {
archivalQueueFactoryParams
QueueFactoryBase
archiver archival.Archiver
}
)

func NewArchivalQueueFactory(
params archivalQueueFactoryParams,
) QueueFactory {
hostScheduler := queues.NewPriorityScheduler(
queues.PrioritySchedulerOptions{
WorkerCount: params.Config.ArchivalProcessorSchedulerWorkerCount,
EnableRateLimiter: params.Config.TaskSchedulerEnableRateLimiter,
MaxDispatchThrottleDuration: HostSchedulerMaxDispatchThrottleDuration,
},
params.SchedulerRateLimiter,
params.TimeSource,
params.Logger,
)
return &archivalQueueFactory{
archivalQueueFactoryParams: params,
QueueFactoryBase: QueueFactoryBase{
HostScheduler: hostScheduler,
HostPriorityAssigner: queues.NewPriorityAssigner(),
HostRateLimiter: NewQueueHostRateLimiter(
params.Config.ArchivalProcessorMaxPollHostRPS,
params.Config.PersistenceMaxQPS,
archivalQueuePersistenceMaxRPSRatio,
),
HostReaderRateLimiter: queues.NewReaderPriorityRateLimiter(
NewHostRateLimiterRateFn(
params.Config.ArchivalProcessorMaxPollHostRPS,
params.Config.PersistenceMaxQPS,
archivalQueuePersistenceMaxRPSRatio,
),
params.Config.QueueMaxReaderCount(),
),
},
}
}

func (f *archivalQueueFactory) CreateQueue(
shard shard.Context,
workflowCache workflow.Cache,
) queues.Queue {
logger := log.With(shard.GetLogger(), tag.ComponentArchivalQueue)

executor := newArchivalQueueTaskExecutor(f.archiver, shard, workflowCache, f.MetricsHandler, f.Logger)

return queues.NewImmediateQueue(
shard,
tasks.CategoryArchival,
f.HostScheduler,
f.HostPriorityAssigner,
executor,
&queues.Options{
ReaderOptions: queues.ReaderOptions{
BatchSize: f.Config.ArchivalTaskBatchSize,
MaxPendingTasksCount: f.Config.QueuePendingTaskMaxCount,
PollBackoffInterval: f.Config.ArchivalProcessorPollBackoffInterval,
},
MonitorOptions: queues.MonitorOptions{
PendingTasksCriticalCount: f.Config.QueuePendingTaskCriticalCount,
ReaderStuckCriticalAttempts: f.Config.QueueReaderStuckCriticalAttempts,
SliceCountCriticalThreshold: f.Config.QueueCriticalSlicesCount,
},
MaxPollRPS: f.Config.ArchivalProcessorMaxPollRPS,
MaxPollInterval: f.Config.ArchivalProcessorMaxPollInterval,
MaxPollIntervalJitterCoefficient: f.Config.ArchivalProcessorMaxPollIntervalJitterCoefficient,
CheckpointInterval: f.Config.ArchivalProcessorUpdateAckInterval,
CheckpointIntervalJitterCoefficient: f.Config.ArchivalProcessorUpdateAckIntervalJitterCoefficient,
MaxReaderCount: f.Config.QueueMaxReaderCount,
TaskMaxRetryCount: f.Config.ArchivalProcessorRetryWarningLimit,
},
f.HostReaderRateLimiter,
logger,
f.MetricsHandler.WithTags(metrics.OperationTag(metrics.OperationArchivalQueueProcessorScope)),
)
}
75 changes: 75 additions & 0 deletions service/history/archivalQueueFactory_test.go
@@ -0,0 +1,75 @@
// The MIT License
//
// Copyright (c) 2020 Temporal Technologies Inc. All rights reserved.
//
// Copyright (c) 2020 Uber Technologies, Inc.
//
// Permission is hereby granted, free of charge, to any person obtaining a copy
// of this software and associated documentation files (the "Software"), to deal
// in the Software without restriction, including without limitation the rights
// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
// copies of the Software, and to permit persons to whom the Software is
// furnished to do so, subject to the following conditions:
//
// The above copyright notice and this permission notice shall be included in
// all copies or substantial portions of the Software.
//
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
// THE SOFTWARE.

package history

import (
"testing"

"github.com/golang/mock/gomock"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"

"go.temporal.io/server/api/persistence/v1"
"go.temporal.io/server/common/log"
"go.temporal.io/server/common/metrics"
"go.temporal.io/server/common/namespace"
"go.temporal.io/server/common/primitives/timestamp"
"go.temporal.io/server/service/history/shard"
"go.temporal.io/server/service/history/tasks"
"go.temporal.io/server/service/history/tests"
)

func TestArchivalQueueFactory(t *testing.T) {
ctrl := gomock.NewController(t)
metricsHandler := metrics.NewMockMetricsHandler(ctrl)
metricsHandler.EXPECT().WithTags(gomock.Any()).Do(func(tags ...metrics.Tag) metrics.MetricsHandler {
require.Len(t, tags, 1)
assert.Equal(t, metrics.OperationTagName, tags[0].Key())
assert.Equal(t, "ArchivalQueueProcessor", tags[0].Value())
return metricsHandler
})
shardContext := shard.NewMockContext(ctrl)
shardContext.EXPECT().GetLogger().Return(log.NewNoopLogger())
shardContext.EXPECT().GetQueueState(tasks.CategoryArchival).Return(&persistence.QueueState{
ReaderStates: nil,
ExclusiveReaderHighWatermark: &persistence.TaskKey{
FireTime: timestamp.TimeNowPtrUtc(),
},
}, true)
shardContext.EXPECT().GetTimeSource().Return(namespace.NewMockClock(ctrl)).AnyTimes()

queueFactory := NewArchivalQueueFactory(archivalQueueFactoryParams{
QueueFactoryBaseParams: QueueFactoryBaseParams{
Config: tests.NewDynamicConfig(),
TimeSource: namespace.NewMockClock(ctrl),
MetricsHandler: metricsHandler,
Logger: log.NewNoopLogger(),
},
})
queue := queueFactory.CreateQueue(shardContext, nil)

require.NotNil(t, queue)
assert.Equal(t, tasks.CategoryArchival, queue.Category())
}
14 changes: 7 additions & 7 deletions service/history/configs/config.go
Expand Up @@ -294,7 +294,6 @@ type Config struct {

// ArchivalQueueProcessor settings
ArchivalProcessorSchedulerWorkerCount dynamicconfig.IntPropertyFn
ArchivalProcessorSchedulerRoundRobinWeights dynamicconfig.MapPropertyFnWithNamespaceFilter
ArchivalProcessorMaxPollHostRPS dynamicconfig.IntPropertyFn
ArchivalTaskBatchSize dynamicconfig.IntPropertyFn
ArchivalProcessorPollBackoffInterval dynamicconfig.DurationPropertyFn
Expand All @@ -303,6 +302,7 @@ type Config struct {
ArchivalProcessorMaxPollIntervalJitterCoefficient dynamicconfig.FloatPropertyFn
ArchivalProcessorUpdateAckInterval dynamicconfig.DurationPropertyFn
ArchivalProcessorUpdateAckIntervalJitterCoefficient dynamicconfig.FloatPropertyFn
ArchivalProcessorRetryWarningLimit dynamicconfig.IntPropertyFn
}

const (
Expand Down Expand Up @@ -529,18 +529,18 @@ func NewConfig(dc *dynamicconfig.Collection, numberOfShards int32, isAdvancedVis
NamespaceCacheRefreshInterval: dc.GetDurationProperty(dynamicconfig.NamespaceCacheRefreshInterval, 10*time.Second),

// Archival related
ArchivalTaskBatchSize: dc.GetIntProperty(dynamicconfig.ArchivalTaskBatchSize, 100),
ArchivalProcessorMaxPollRPS: dc.GetIntProperty(dynamicconfig.ArchivalProcessorMaxPollRPS, 20),
ArchivalProcessorMaxPollHostRPS: dc.GetIntProperty(dynamicconfig.ArchivalProcessorMaxPollHostRPS, 0),
ArchivalProcessorSchedulerWorkerCount: dc.GetIntProperty(dynamicconfig.ArchivalProcessorSchedulerWorkerCount, 512),
ArchivalProcessorSchedulerRoundRobinWeights: dc.GetMapPropertyFnWithNamespaceFilter(dynamicconfig.ArchivalProcessorSchedulerRoundRobinWeights, ConvertWeightsToDynamicConfigValue(DefaultActiveTaskPriorityWeight)),
ArchivalProcessorMaxPollInterval: dc.GetDurationProperty(dynamicconfig.ArchivalProcessorMaxPollInterval, 1*time.Minute),
ArchivalTaskBatchSize: dc.GetIntProperty(dynamicconfig.ArchivalTaskBatchSize, 100),
ArchivalProcessorMaxPollRPS: dc.GetIntProperty(dynamicconfig.ArchivalProcessorMaxPollRPS, 20),
ArchivalProcessorMaxPollHostRPS: dc.GetIntProperty(dynamicconfig.ArchivalProcessorMaxPollHostRPS, 0),
ArchivalProcessorSchedulerWorkerCount: dc.GetIntProperty(dynamicconfig.ArchivalProcessorSchedulerWorkerCount, 512),
ArchivalProcessorMaxPollInterval: dc.GetDurationProperty(dynamicconfig.ArchivalProcessorMaxPollInterval, 1*time.Minute),
ArchivalProcessorMaxPollIntervalJitterCoefficient: dc.GetFloat64Property(dynamicconfig.
ArchivalProcessorMaxPollIntervalJitterCoefficient, 0.15),
ArchivalProcessorUpdateAckInterval: dc.GetDurationProperty(dynamicconfig.ArchivalProcessorUpdateAckInterval, 30*time.Second),
ArchivalProcessorUpdateAckIntervalJitterCoefficient: dc.GetFloat64Property(dynamicconfig.
ArchivalProcessorUpdateAckIntervalJitterCoefficient, 0.15),
ArchivalProcessorPollBackoffInterval: dc.GetDurationProperty(dynamicconfig.ArchivalProcessorPollBackoffInterval, 5*time.Second),
ArchivalProcessorRetryWarningLimit: dc.GetIntProperty(dynamicconfig.ArchivalProcessorRetryWarningLimit, 100),
}

return cfg
Expand Down
15 changes: 15 additions & 0 deletions service/history/queues/priority_assigner_test.go
Expand Up @@ -83,3 +83,18 @@ func (s *priorityAssignerSuite) TestAssign_HighPriorityTaskTypes() {

s.Equal(tasks.PriorityHigh, s.priorityAssigner.Assign(mockExecutable))
}

func (s *priorityAssignerSuite) TestAssign_LowPriorityTaskTypes() {
for _, taskType := range []enumsspb.TaskType{
enumsspb.TASK_TYPE_DELETE_HISTORY_EVENT,
enumsspb.TASK_TYPE_TRANSFER_DELETE_EXECUTION,
enumsspb.TASK_TYPE_VISIBILITY_DELETE_EXECUTION,
enumsspb.TASK_TYPE_ARCHIVAL_ARCHIVE_EXECUTION,
enumsspb.TASK_TYPE_UNSPECIFIED,
} {
mockExecutable := NewMockExecutable(s.controller)
mockExecutable.EXPECT().GetType().Return(taskType).Times(1)

s.Equal(tasks.PriorityLow, s.priorityAssigner.Assign(mockExecutable))
}
}

0 comments on commit dd30d67

Please sign in to comment.