diff --git a/service/history/archivalQueueFactory.go b/service/history/archivalQueueFactory.go new file mode 100644 index 000000000000..8ba243474e37 --- /dev/null +++ b/service/history/archivalQueueFactory.go @@ -0,0 +1,132 @@ +// The MIT License +// +// Copyright (c) 2020 Temporal Technologies Inc. All rights reserved. +// +// Copyright (c) 2020 Uber Technologies, Inc. +// +// Permission is hereby granted, free of charge, to any person obtaining a copy +// of this software and associated documentation files (the "Software"), to deal +// in the Software without restriction, including without limitation the rights +// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +// copies of the Software, and to permit persons to whom the Software is +// furnished to do so, subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in +// all copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN +// THE SOFTWARE. + +package history + +import ( + "go.uber.org/fx" + + "go.temporal.io/server/common/log" + "go.temporal.io/server/common/log/tag" + "go.temporal.io/server/common/metrics" + "go.temporal.io/server/service/history/archival" + "go.temporal.io/server/service/history/queues" + "go.temporal.io/server/service/history/shard" + "go.temporal.io/server/service/history/tasks" + "go.temporal.io/server/service/history/workflow" +) + +const ( + archivalQueuePersistenceMaxRPSRatio = 0.15 +) + +type ( + archivalQueueFactoryParams struct { + fx.In + + QueueFactoryBaseParams + } + + archivalQueueFactory struct { + *archivalQueueFactoryParams + QueueFactoryBase + archiver archival.Archiver + } +) + +func NewArchivalQueueFactory( + params *archivalQueueFactoryParams, +) QueueFactory { + hostScheduler := queues.NewNamespacePriorityScheduler( + params.ClusterMetadata.GetCurrentClusterName(), + queues.NamespacePrioritySchedulerOptions{ + WorkerCount: params.Config.ArchivalProcessorSchedulerWorkerCount, + // we don't need standby weights because we only run in the active cluster + ActiveNamespaceWeights: params.Config.ArchivalProcessorSchedulerRoundRobinWeights, + }, + params.NamespaceRegistry, + params.TimeSource, + params.MetricsHandler.WithTags(metrics.OperationTag(queues.OperationArchivalQueueProcessor)), + params.Logger, + ) + return &archivalQueueFactory{ + archivalQueueFactoryParams: params, + QueueFactoryBase: QueueFactoryBase{ + HostScheduler: hostScheduler, + HostPriorityAssigner: queues.NewPriorityAssigner(), + HostRateLimiter: NewQueueHostRateLimiter( + params.Config.ArchivalProcessorMaxPollHostRPS, + params.Config.PersistenceMaxQPS, + archivalQueuePersistenceMaxRPSRatio, + ), + HostReaderRateLimiter: queues.NewReaderPriorityRateLimiter( + NewHostRateLimiterRateFn( + params.Config.ArchivalProcessorMaxPollHostRPS, + params.Config.PersistenceMaxQPS, + archivalQueuePersistenceMaxRPSRatio, + ), + params.Config.QueueMaxReaderCount(), + ), + }, + } +} + +func (f *archivalQueueFactory) CreateQueue( + shard shard.Context, + workflowCache workflow.Cache, +) queues.Queue { + logger := log.With(shard.GetLogger(), tag.ComponentArchivalQueue) + + executor := newArchivalQueueTaskExecutor(f.archiver, shard, workflowCache, f.MetricsHandler, f.Logger) + + return queues.NewImmediateQueue( + shard, + tasks.CategoryArchival, + f.HostScheduler, + f.HostPriorityAssigner, + executor, + &queues.Options{ + ReaderOptions: queues.ReaderOptions{ + BatchSize: f.Config.ArchivalTaskBatchSize, + MaxPendingTasksCount: f.Config.QueuePendingTaskMaxCount, + PollBackoffInterval: f.Config.ArchivalProcessorPollBackoffInterval, + }, + MonitorOptions: queues.MonitorOptions{ + PendingTasksCriticalCount: f.Config.QueuePendingTaskCriticalCount, + ReaderStuckCriticalAttempts: f.Config.QueueReaderStuckCriticalAttempts, + SliceCountCriticalThreshold: f.Config.QueueCriticalSlicesCount, + }, + MaxPollRPS: f.Config.ArchivalProcessorMaxPollRPS, + MaxPollInterval: f.Config.ArchivalProcessorMaxPollInterval, + MaxPollIntervalJitterCoefficient: f.Config.ArchivalProcessorMaxPollIntervalJitterCoefficient, + CheckpointInterval: f.Config.ArchivalProcessorUpdateAckInterval, + CheckpointIntervalJitterCoefficient: f.Config.ArchivalProcessorUpdateAckIntervalJitterCoefficient, + MaxReaderCount: f.Config.QueueMaxReaderCount, + TaskMaxRetryCount: f.Config.ArchivalTaskMaxRetryCount, + }, + f.HostReaderRateLimiter, + logger, + f.MetricsHandler.WithTags(metrics.OperationTag(queues.OperationArchivalQueueProcessor)), + ) +} diff --git a/service/history/archivalQueueFactory_test.go b/service/history/archivalQueueFactory_test.go new file mode 100644 index 000000000000..ad255b79680d --- /dev/null +++ b/service/history/archivalQueueFactory_test.go @@ -0,0 +1,98 @@ +// The MIT License +// +// Copyright (c) 2020 Temporal Technologies Inc. All rights reserved. +// +// Copyright (c) 2020 Uber Technologies, Inc. +// +// Permission is hereby granted, free of charge, to any person obtaining a copy +// of this software and associated documentation files (the "Software"), to deal +// in the Software without restriction, including without limitation the rights +// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +// copies of the Software, and to permit persons to whom the Software is +// furnished to do so, subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in +// all copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN +// THE SOFTWARE. + +package history + +import ( + "testing" + + "github.com/golang/mock/gomock" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + + "go.temporal.io/server/api/persistence/v1" + "go.temporal.io/server/common/cluster" + "go.temporal.io/server/common/log" + "go.temporal.io/server/common/metrics" + "go.temporal.io/server/common/namespace" + "go.temporal.io/server/common/primitives/timestamp" + "go.temporal.io/server/service/history/configs" + "go.temporal.io/server/service/history/shard" + "go.temporal.io/server/service/history/tasks" + "go.temporal.io/server/service/history/workflow" +) + +func TestArchivalQueueFactory(t *testing.T) { + ctrl := gomock.NewController(t) + namespaceRegistry := namespace.NewMockRegistry(ctrl) + clusterMetadata := cluster.NewMockMetadata(ctrl) + cfg := &configs.Config{ + ArchivalProcessorMaxPollHostRPS: func() int { + return 1 + }, + PersistenceMaxQPS: func() int { + return 1 + }, + QueueMaxReaderCount: func() int { + return 1 + }, + } + timeSource := namespace.NewMockClock(ctrl) + metricsHandler := metrics.NewMockMetricsHandler(ctrl) + logger := log.NewNoopLogger() + + clusterMetadata.EXPECT().GetCurrentClusterName().Return(cluster.TestCurrentClusterName) + metricsHandler.EXPECT().WithTags(gomock.Any()).Do(func(tags ...metrics.Tag) metrics.MetricsHandler { + require.Len(t, tags, 1) + assert.Equal(t, metrics.OperationTagName, tags[0].Key()) + assert.Equal(t, "ArchivalQueueProcessor", tags[0].Value()) + return metricsHandler + }).Times(2) + + queueFactory := NewArchivalQueueFactory(&archivalQueueFactoryParams{ + QueueFactoryBaseParams: QueueFactoryBaseParams{ + NamespaceRegistry: namespaceRegistry, + ClusterMetadata: clusterMetadata, + Config: cfg, + TimeSource: timeSource, + MetricsHandler: metricsHandler, + Logger: logger, + }, + }) + shardContext := shard.NewMockContext(ctrl) + workflowCache := workflow.NewMockCache(ctrl) + println(queueFactory, shardContext, workflowCache) + + shardContext.EXPECT().GetLogger().Return(logger) + shardContext.EXPECT().GetQueueState(tasks.CategoryArchival).Return(&persistence.QueueState{ + ReaderStates: nil, + ExclusiveReaderHighWatermark: &persistence.TaskKey{ + FireTime: timestamp.TimeNowPtrUtc(), + }, + }, true) + shardContext.EXPECT().GetTimeSource().Return(timeSource).AnyTimes() + + queue := queueFactory.CreateQueue(shardContext, workflowCache) + assert.NotNil(t, queue) +}