Skip to content

Commit

Permalink
Enable archival queue factory iff it is enabled in the static config
Browse files Browse the repository at this point in the history
  • Loading branch information
MichaelSnowden committed Jan 23, 2023
1 parent 4ba7bf0 commit 299be8f
Show file tree
Hide file tree
Showing 5 changed files with 245 additions and 6 deletions.
5 changes: 5 additions & 0 deletions common/archiver/archivalMetadata.go
Expand Up @@ -52,6 +52,7 @@ type (
ReadEnabled() bool
GetNamespaceDefaultState() enumspb.ArchivalState
GetNamespaceDefaultURI() string
StaticClusterState() ArchivalState
}

archivalMetadata struct {
Expand All @@ -71,6 +72,10 @@ type (
ArchivalState int
)

func (a *archivalConfig) StaticClusterState() ArchivalState {
return a.staticClusterState
}

const (
// ArchivalDisabled means this cluster is not configured to handle archival
ArchivalDisabled ArchivalState = iota
Expand Down
14 changes: 14 additions & 0 deletions common/archiver/archivalMetadata_mock.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion service/history/historyEngineFactory.go
Expand Up @@ -55,7 +55,7 @@ type (
NewCacheFn wcache.NewCacheFn
ArchivalClient archiver.Client
EventSerializer serialization.Serializer
QueueFactories []QueueFactory `group:"queueFactory"`
QueueFactories []QueueFactory
ReplicationTaskFetcherFactory replication.TaskFetcherFactory
ReplicationTaskExecutorProvider replication.TaskExecutorProvider
TracerProvider trace.TracerProvider
Expand Down
38 changes: 33 additions & 5 deletions service/history/queueFactoryBase.go
Expand Up @@ -31,6 +31,7 @@ import (
"go.uber.org/fx"

"go.temporal.io/server/common"
"go.temporal.io/server/common/archiver"
"go.temporal.io/server/common/clock"
"go.temporal.io/server/common/cluster"
"go.temporal.io/server/common/dynamicconfig"
Expand Down Expand Up @@ -87,33 +88,60 @@ type (
fx.In

Lifecycle fx.Lifecycle
Factories []QueueFactory `group:"queueFactory"`
Factories []QueueFactory
}
)

var QueueModule = fx.Options(
fx.Provide(QueueSchedulerRateLimiterProvider),
fx.Provide(
fx.Annotated{
Group: QueueFactoryFxGroup,
Name: "transferQueueFactory",
Target: NewTransferQueueFactory,
},
fx.Annotated{
Group: QueueFactoryFxGroup,
Name: "timerQueueFactory",
Target: NewTimerQueueFactory,
},
fx.Annotated{
Group: QueueFactoryFxGroup,
Name: "visibilityQueueFactory",
Target: NewVisibilityQueueFactory,
},
fx.Annotated{
Group: QueueFactoryFxGroup,
Name: "archivalQueueFactory",
Target: NewArchivalQueueFactory,
},
getQueueFactories,
),
fx.Invoke(QueueFactoryLifetimeHooks),
)

type queueFactorySet struct {
fx.In

TransferQueueFactory QueueFactory `name:"transferQueueFactory"`
TimerQueueFactory QueueFactory `name:"timerQueueFactory"`
VisibilityQueueFactory QueueFactory `name:"visibilityQueueFactory"`
ArchivalQueueFactory QueueFactory `name:"archivalQueueFactory"`
}

// getQueueFactories returns factories for all the enabled queue types.
// The archival queue factory is only returned when archival is enabled in the static config.
func getQueueFactories(
queueFactorySet queueFactorySet,
archivalMetadata archiver.ArchivalMetadata,
) []QueueFactory {
factories := []QueueFactory{
queueFactorySet.TransferQueueFactory,
queueFactorySet.TimerQueueFactory,
queueFactorySet.VisibilityQueueFactory,
}
if archivalMetadata.GetHistoryConfig().StaticClusterState() == archiver.ArchivalEnabled || archivalMetadata.GetVisibilityConfig().StaticClusterState() == archiver.ArchivalEnabled {
factories = append(factories, queueFactorySet.ArchivalQueueFactory)
}
return factories
}

func QueueSchedulerRateLimiterProvider(
config *configs.Config,
) queues.SchedulerRateLimiter {
Expand Down
192 changes: 192 additions & 0 deletions service/history/queue_factory_base_test.go
@@ -0,0 +1,192 @@
// The MIT License
//
// Copyright (c) 2020 Temporal Technologies Inc. All rights reserved.
//
// Copyright (c) 2020 Uber Technologies, Inc.
//
// Permission is hereby granted, free of charge, to any person obtaining a copy
// of this software and associated documentation files (the "Software"), to deal
// in the Software without restriction, including without limitation the rights
// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
// copies of the Software, and to permit persons to whom the Software is
// furnished to do so, subject to the following conditions:
//
// The above copyright notice and this permission notice shall be included in
// all copies or substantial portions of the Software.
//
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
// THE SOFTWARE.

package history

import (
"testing"

"github.com/golang/mock/gomock"
"github.com/stretchr/testify/require"
"go.uber.org/fx"

"go.temporal.io/server/api/historyservice/v1"
"go.temporal.io/server/client"
carchiver "go.temporal.io/server/common/archiver"
"go.temporal.io/server/common/clock"
"go.temporal.io/server/common/cluster"
"go.temporal.io/server/common/dynamicconfig"
"go.temporal.io/server/common/log"
"go.temporal.io/server/common/metrics"
"go.temporal.io/server/common/namespace"
"go.temporal.io/server/common/persistence/visibility/manager"
"go.temporal.io/server/common/resource"
"go.temporal.io/server/common/sdk"
"go.temporal.io/server/service/history/archival"
"go.temporal.io/server/service/history/configs"
"go.temporal.io/server/service/history/workflow"
"go.temporal.io/server/service/worker/archiver"
)

// TestQueueModule_ArchivalQueueCreated tests that the archival queue is created if and only if the static config for
// either history or visibility archival is enabled.
func TestQueueModule_ArchivalQueue(t *testing.T) {
for _, c := range []moduleTestCase{
{
Name: "Archival completely disabled",
HistoryState: carchiver.ArchivalDisabled,
VisibilityState: carchiver.ArchivalDisabled,
ExpectArchivalQueue: false,
},
{
Name: "History archival enabled",
HistoryState: carchiver.ArchivalEnabled,
VisibilityState: carchiver.ArchivalDisabled,
ExpectArchivalQueue: true,
},
{
Name: "Visibility archival enabled",
HistoryState: carchiver.ArchivalDisabled,
VisibilityState: carchiver.ArchivalEnabled,
ExpectArchivalQueue: true,
},
{
Name: "Both history and visibility archival enabled",
HistoryState: carchiver.ArchivalEnabled,
VisibilityState: carchiver.ArchivalEnabled,
ExpectArchivalQueue: true,
},
} {
c := c
t.Run(c.Name, c.Run)
}
}

// moduleTestCase is a test case for the QueueModule.
type moduleTestCase struct {
Name string
HistoryState carchiver.ArchivalState
VisibilityState carchiver.ArchivalState
ExpectArchivalQueue bool
}

// Run runs the test case.
func (c *moduleTestCase) Run(t *testing.T) {
t.Parallel()
controller := gomock.NewController(t)
dependencies := getModuleDependencies(controller, c)
var factories []QueueFactory

app := fx.New(
dependencies,
QueueModule,
fx.Invoke(func(params QueueFactoriesLifetimeHookParams) {
factories = params.Factories
}),
)

require.NoError(t, app.Err())
require.NotNil(t, factories)
var (
txq QueueFactory
tiq QueueFactory
viq QueueFactory
aq QueueFactory
)
for _, f := range factories {
switch f.(type) {
case *transferQueueFactory:
require.Nil(t, txq)
txq = f
case *timerQueueFactory:
require.Nil(t, tiq)
tiq = f
case *visibilityQueueFactory:
require.Nil(t, viq)
viq = f
case *archivalQueueFactory:
require.Nil(t, aq)
aq = f
}
}
require.NotNil(t, txq)
require.NotNil(t, tiq)
require.NotNil(t, viq)
if c.ExpectArchivalQueue {
require.NotNil(t, aq)
} else {
require.Nil(t, aq)
}
}

// getModuleDependencies returns an fx.Option that provides all the dependencies needed for the queue module.
func getModuleDependencies(controller *gomock.Controller, c *moduleTestCase) fx.Option {
cfg := configs.NewConfig(
dynamicconfig.NewNoopCollection(),
1,
false,
"",
)
archivalMetadata := getArchivalMetadata(controller, c)
clusterMetadata := cluster.NewMockMetadata(controller)
clusterMetadata.EXPECT().GetCurrentClusterName().Return("module-test-cluster-name").AnyTimes()
return fx.Supply(
compileTimeDependencies{},
cfg,
fx.Annotate(archivalMetadata, fx.As(new(carchiver.ArchivalMetadata))),
fx.Annotate(metrics.NoopMetricsHandler, fx.As(new(metrics.Handler))),
fx.Annotate(clusterMetadata, fx.As(new(cluster.Metadata))),
)
}

// compileTimeDependencies is a struct that provides nil implementations of all the dependencies needed for the queue
// module that are not required for the test at runtime.
type compileTimeDependencies struct {
fx.Out

namespace.Registry
clock.TimeSource
log.SnTaggedLogger
client.Bean
archiver.Client
sdk.ClientFactory
resource.MatchingClient
historyservice.HistoryServiceClient
manager.VisibilityManager
archival.Archiver
workflow.RelocatableAttributesFetcher
}

// getArchivalMetadata returns a mock ArchivalMetadata that contains the static archival config specified in the given
// test case.
func getArchivalMetadata(controller *gomock.Controller, c *moduleTestCase) *carchiver.MockArchivalMetadata {
archivalMetadata := carchiver.NewMockArchivalMetadata(controller)
historyConfig := carchiver.NewMockArchivalConfig(controller)
visibilityConfig := carchiver.NewMockArchivalConfig(controller)
historyConfig.EXPECT().StaticClusterState().Return(c.HistoryState).AnyTimes()
visibilityConfig.EXPECT().StaticClusterState().Return(c.VisibilityState).AnyTimes()
archivalMetadata.EXPECT().GetHistoryConfig().Return(historyConfig).AnyTimes()
archivalMetadata.EXPECT().GetVisibilityConfig().Return(visibilityConfig).AnyTimes()
return archivalMetadata
}

0 comments on commit 299be8f

Please sign in to comment.