Skip to content

Commit

Permalink
Separate read-only category index from write-only registry
Browse files Browse the repository at this point in the history
  • Loading branch information
MichaelSnowden committed Jan 28, 2023
1 parent 3484bed commit 61a16f8
Show file tree
Hide file tree
Showing 20 changed files with 230 additions and 142 deletions.
22 changes: 11 additions & 11 deletions common/resourcetest/resourceTest.go
Original file line number Diff line number Diff line change
Expand Up @@ -110,8 +110,8 @@ type (
ExecutionMgr *persistence.MockExecutionManager
PersistenceBean *persistenceClient.MockBean

Logger log.Logger
TaskCategoryRegistry tasks.CategoryRegistry
Logger log.Logger
TaskCategoryIndex tasks.CategoryIndex
}
)

Expand Down Expand Up @@ -184,13 +184,13 @@ func NewTest(

// other common resources

NamespaceCache: namespace.NewMockRegistry(controller),
TimeSource: clock.NewRealTimeSource(),
PayloadSerializer: serialization.NewSerializer(),
MetricsHandler: metricsHandler,
ArchivalMetadata: archiver.NewMetadataMock(controller),
ArchiverProvider: provider.NewMockArchiverProvider(controller),
TaskCategoryRegistry: tasks.NewDefaultCategoryRegistry(),
NamespaceCache: namespace.NewMockRegistry(controller),
TimeSource: clock.NewRealTimeSource(),
PayloadSerializer: serialization.NewSerializer(),
MetricsHandler: metricsHandler,
ArchivalMetadata: archiver.NewMetadataMock(controller),
ArchiverProvider: provider.NewMockArchiverProvider(controller),
TaskCategoryIndex: tasks.CategoryRegistryProvider().BuildCategoryIndex(),

// membership infos

Expand Down Expand Up @@ -450,6 +450,6 @@ func (t *Test) RefreshNamespaceCache() {
t.NamespaceCache.Refresh()
}

func (t *Test) GetTaskCategoryRegistry() tasks.CategoryRegistry {
return t.TaskCategoryRegistry
func (t *Test) GetTaskCategoryIndex() tasks.CategoryIndex {
return t.TaskCategoryIndex
}
8 changes: 4 additions & 4 deletions service/frontend/adminHandler.go
Original file line number Diff line number Diff line change
Expand Up @@ -118,7 +118,7 @@ type (
saManager searchattribute.Manager
clusterMetadata cluster.Metadata
healthServer *health.Server
taskCategoryRegistry tasks.CategoryRegistry
taskCategoryIndex tasks.CategoryIndex
}

NewAdminHandlerArgs struct {
Expand Down Expand Up @@ -148,7 +148,7 @@ type (
HealthServer *health.Server
EventSerializer serialization.Serializer
TimeSource clock.TimeSource
TaskCategoryRegistry tasks.CategoryRegistry
TaskCategoryIndex tasks.CategoryIndex
}
)

Expand Down Expand Up @@ -197,7 +197,7 @@ func NewAdminHandler(
saManager: args.SaManager,
clusterMetadata: args.ClusterMetadata,
healthServer: args.HealthServer,
taskCategoryRegistry: args.TaskCategoryRegistry,
taskCategoryIndex: args.TaskCategoryIndex,
}
}

Expand Down Expand Up @@ -518,7 +518,7 @@ func (adh *AdminHandler) ListHistoryTasks(
return nil, errTaskRangeNotSet
}

taskCategory, ok := adh.taskCategoryRegistry.GetCategoryByID(int32(request.Category))
taskCategory, ok := adh.taskCategoryIndex.GetCategoryByID(int32(request.Category))
if !ok {
return nil, &serviceerror.InvalidArgument{
Message: fmt.Sprintf("unknown task category: %v", request.Category),
Expand Down
2 changes: 1 addition & 1 deletion service/frontend/adminHandler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -163,7 +163,7 @@ func (s *adminHandlerSuite) SetupTest() {
health.NewServer(),
serialization.NewSerializer(),
clock.NewRealTimeSource(),
s.mockResource.GetTaskCategoryRegistry(),
s.mockResource.GetTaskCategoryIndex(),
}
s.mockMetadata.EXPECT().GetCurrentClusterName().Return(uuid.New()).AnyTimes()
s.handler = NewAdminHandler(args)
Expand Down
4 changes: 2 additions & 2 deletions service/frontend/fx.go
Original file line number Diff line number Diff line change
Expand Up @@ -440,7 +440,7 @@ func AdminHandlerProvider(
healthServer *health.Server,
eventSerializer serialization.Serializer,
timeSource clock.TimeSource,
taskCategoryRegistry tasks.CategoryRegistry,
taskCategoryIndex tasks.CategoryIndex,
) *AdminHandler {
args := NewAdminHandlerArgs{
persistenceConfig,
Expand Down Expand Up @@ -469,7 +469,7 @@ func AdminHandlerProvider(
healthServer,
eventSerializer,
timeSource,
taskCategoryRegistry,
taskCategoryIndex,
}
return NewAdminHandler(args)
}
Expand Down
16 changes: 16 additions & 0 deletions service/history/archival_queue_factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ package history
import (
"go.uber.org/fx"

"go.temporal.io/server/common/archiver"
"go.temporal.io/server/common/log"
"go.temporal.io/server/common/log/tag"
"go.temporal.io/server/common/metrics"
Expand Down Expand Up @@ -186,3 +187,18 @@ func (f *archivalQueueFactory) newScheduledQueue(shard shard.Context, executor q
metricsHandler,
)
}

// archivalTaskCategoryDecorator adds the archival task category to the task category registry if archival is enabled
// in the cluster's static config for either history or visibility.
func archivalTaskCategoryDecorator(
archivalMetadata archiver.ArchivalMetadata,
registry tasks.CategoryRegistry,
) (tasks.CategoryRegistry, error) {
if archivalMetadata.GetHistoryConfig().StaticClusterState() == archiver.ArchivalEnabled ||
archivalMetadata.GetVisibilityConfig().StaticClusterState() == archiver.ArchivalEnabled {
if err := registry.RegisterCategory(tasks.CategoryArchival); err != nil {
return nil, err
}
}
return registry, nil
}
2 changes: 1 addition & 1 deletion service/history/fx.go
Original file line number Diff line number Diff line change
Expand Up @@ -143,7 +143,7 @@ func HandlerProvider(args NewHandlerArgs) *Handler {
eventNotifier: args.EventNotifier,
replicationTaskFetcherFactory: args.ReplicationTaskFetcherFactory,
tracer: args.TracerProvider.Tracer(consts.LibraryName),
taskCategoryRegistry: args.TaskCategoryRegistry,
taskCategoryIndex: args.TaskCategoryIndex,
}

// prevent us from trying to serve requests before shard controller is started and ready
Expand Down
6 changes: 3 additions & 3 deletions service/history/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,7 @@ type (
hostInfoProvider membership.HostInfoProvider
controller shard.Controller
tracer trace.Tracer
taskCategoryRegistry tasks.CategoryRegistry
taskCategoryIndex tasks.CategoryIndex
}

NewHandlerArgs struct {
Expand All @@ -123,7 +123,7 @@ type (
EventNotifier events.Notifier
ReplicationTaskFetcherFactory replication.TaskFetcherFactory
TracerProvider trace.TracerProvider
TaskCategoryRegistry tasks.CategoryRegistry
TaskCategoryIndex tasks.CategoryIndex
}
)

Expand Down Expand Up @@ -574,7 +574,7 @@ func (h *Handler) RemoveTask(ctx context.Context, request *historyservice.Remove
category = tasks.CategoryReplication
default:
var ok bool
category, ok = h.taskCategoryRegistry.GetCategoryByID(int32(categoryID))
category, ok = h.taskCategoryIndex.GetCategoryByID(int32(categoryID))
if !ok {
return nil, serviceerror.NewInvalidArgument(fmt.Sprintf("Invalid task category ID: %v", categoryID))
}
Expand Down
19 changes: 7 additions & 12 deletions service/history/queue_factory_base.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,6 @@ import (
"go.uber.org/fx"

"go.temporal.io/server/common"
"go.temporal.io/server/common/archiver"
"go.temporal.io/server/common/clock"
"go.temporal.io/server/common/cluster"
"go.temporal.io/server/common/dynamicconfig"
Expand Down Expand Up @@ -94,8 +93,9 @@ type (
)

var QueueModule = fx.Options(
fx.Provide(QueueSchedulerRateLimiterProvider),
fx.Decorate(archivalTaskCategoryDecorator),
fx.Provide(
QueueSchedulerRateLimiterProvider,
fx.Annotated{
Name: "transferQueueFactory",
Target: NewTransferQueueFactory,
Expand All @@ -112,7 +112,7 @@ var QueueModule = fx.Options(
Name: "archivalQueueFactory",
Target: NewArchivalQueueFactory,
},
getQueueFactories,
queueFactoriesProvider,
),
fx.Invoke(QueueFactoryLifetimeHooks),
)
Expand All @@ -126,23 +126,18 @@ type queueFactorySet struct {
ArchivalQueueFactory QueueFactory `name:"archivalQueueFactory"`
}

// getQueueFactories returns factories for all the enabled queue types.
// queueFactoriesProvider returns factories for all the enabled queue types.
// The archival queue factory is only returned when archival is enabled in the static config.
func getQueueFactories(
func queueFactoriesProvider(
queueFactorySet queueFactorySet,
archivalMetadata archiver.ArchivalMetadata,
registry tasks.CategoryRegistry,
categoryIndex tasks.CategoryIndex,
) ([]QueueFactory, error) {
factories := []QueueFactory{
queueFactorySet.TransferQueueFactory,
queueFactorySet.TimerQueueFactory,
queueFactorySet.VisibilityQueueFactory,
}
if archivalMetadata.GetHistoryConfig().StaticClusterState() == archiver.ArchivalEnabled ||
archivalMetadata.GetVisibilityConfig().StaticClusterState() == archiver.ArchivalEnabled {
if err := registry.RegisterCategory(tasks.CategoryArchival); err != nil {
return nil, err
}
if _, ok := categoryIndex.GetCategoryByID(tasks.CategoryIDArchival); ok {
factories = append(factories, queueFactorySet.ArchivalQueueFactory)
}
return factories, nil
Expand Down
2 changes: 1 addition & 1 deletion service/history/queue_factory_base_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,7 @@ func (c *moduleTestCase) Run(t *testing.T) {
dependencies := getModuleDependencies(controller, c)

var hookParams QueueFactoriesLifetimeHookParams
var categoryRegistry tasks.CategoryRegistry
var categoryRegistry tasks.CategoryIndex
app := fx.New(
dependencies,
QueueModule,
Expand Down
14 changes: 7 additions & 7 deletions service/history/shard/context_impl.go
Original file line number Diff line number Diff line change
Expand Up @@ -115,7 +115,7 @@ type (
clusterMetadata cluster.Metadata
archivalMetadata archiver.ArchivalMetadata
hostInfoProvider membership.HostInfoProvider
taskCategoryRegistry tasks.CategoryRegistry
taskCategoryIndex tasks.CategoryIndex

// Context that lives for the lifetime of the shard context
lifecycleCtx context.Context
Expand Down Expand Up @@ -1233,7 +1233,7 @@ func (s *ContextImpl) emitShardInfoMetricsLogsLocked() {
logWarnLagExceeded := false

for categoryID := range s.shardInfo.QueueAckLevels {
category, ok := s.taskCategoryRegistry.GetCategoryByID(categoryID)
category, ok := s.taskCategoryIndex.GetCategoryByID(categoryID)
if !ok {
continue
}
Expand Down Expand Up @@ -1264,7 +1264,7 @@ func (s *ContextImpl) emitShardInfoMetricsLogsLocked() {
if logWarnLagExceeded && s.config.EmitShardLagLog() {
ackLevelTags := make([]tag.Tag, 0, len(s.shardInfo.QueueAckLevels))
for categoryID, ackLevel := range s.shardInfo.QueueAckLevels {
category, ok := s.taskCategoryRegistry.GetCategoryByID(categoryID)
category, ok := s.taskCategoryIndex.GetCategoryByID(categoryID)
if !ok {
continue
}
Expand Down Expand Up @@ -1666,7 +1666,7 @@ func (s *ContextImpl) notifyQueueProcessor() {

now := s.timeSource.Now()
fakeTasks := make(map[tasks.Category][]tasks.Task)
for _, category := range s.taskCategoryRegistry.GetCategories() {
for _, category := range s.taskCategoryIndex.GetCategories() {
fakeTasks[category] = []tasks.Task{tasks.NewFakeTask(definition.WorkflowKey{}, category, now)}
}

Expand Down Expand Up @@ -1748,7 +1748,7 @@ func (s *ContextImpl) loadShardMetadata(ownershipChanged *bool) error {
remoteClusterInfos := make(map[string]*remoteClusterInfo)
var scheduledTaskMaxReadLevel time.Time
currentClusterName := s.GetClusterMetadata().GetCurrentClusterName()
taskCategories := s.taskCategoryRegistry.GetCategories()
taskCategories := s.taskCategoryIndex.GetCategories()
for clusterName, info := range s.GetClusterMetadata().GetAllClusterInfo() {
if !info.Enabled {
continue
Expand Down Expand Up @@ -1984,7 +1984,7 @@ func newContext(
clusterMetadata cluster.Metadata,
archivalMetadata archiver.ArchivalMetadata,
hostInfoProvider membership.HostInfoProvider,
taskCategoryRegistry tasks.CategoryRegistry,
taskCategoryIndex tasks.CategoryIndex,
) (*ContextImpl, error) {
hostIdentity := hostInfoProvider.HostInfo().Identity()

Expand Down Expand Up @@ -2012,7 +2012,7 @@ func newContext(
clusterMetadata: clusterMetadata,
archivalMetadata: archivalMetadata,
hostInfoProvider: hostInfoProvider,
taskCategoryRegistry: taskCategoryRegistry,
taskCategoryIndex: taskCategoryIndex,
handoverNamespaces: make(map[namespace.Name]*namespaceHandOverInfo),
lifecycleCtx: lifecycleCtx,
lifecycleCancel: lifecycleCancel,
Expand Down
2 changes: 1 addition & 1 deletion service/history/shard/context_testutil.go
Original file line number Diff line number Diff line change
Expand Up @@ -111,7 +111,7 @@ func NewTestContext(
historyClient: resourceTest.GetHistoryClient(),
archivalMetadata: resourceTest.GetArchivalMetadata(),
hostInfoProvider: hostInfoProvider,
taskCategoryRegistry: resourceTest.GetTaskCategoryRegistry(),
taskCategoryIndex: resourceTest.GetTaskCategoryIndex(),
}
return &ContextTest{
Resource: resourceTest,
Expand Down
4 changes: 2 additions & 2 deletions service/history/shard/controller_impl.go
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,7 @@ type (
archivalMetadata archiver.ArchivalMetadata
hostInfoProvider membership.HostInfoProvider
tracer trace.Tracer
taskCategoryRegistry tasks.CategoryRegistry
taskCategoryIndex tasks.CategoryIndex
}
)

Expand Down Expand Up @@ -311,7 +311,7 @@ func (c *ControllerImpl) getOrCreateShardContext(shardID int32) (*ContextImpl, e
c.clusterMetadata,
c.archivalMetadata,
c.hostInfoProvider,
c.taskCategoryRegistry,
c.taskCategoryIndex,
)
if err != nil {
return nil, err
Expand Down
2 changes: 1 addition & 1 deletion service/history/shard/controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,7 @@ func NewTestController(
clusterMetadata: resource.GetClusterMetadata(),
archivalMetadata: resource.GetArchivalMetadata(),
hostInfoProvider: hostInfoProvider,
taskCategoryRegistry: resource.GetTaskCategoryRegistry(),
taskCategoryIndex: resource.GetTaskCategoryIndex(),

status: common.DaemonStatusInitialized,
membershipUpdateCh: make(chan *membership.ChangedEvent, 10),
Expand Down
4 changes: 2 additions & 2 deletions service/history/shard/fx.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ func ControllerProvider(
hostInfoProvider membership.HostInfoProvider,
engineFactory EngineFactory,
tracerProvider trace.TracerProvider,
taskCategoryRegistry tasks.CategoryRegistry,
taskCategoryIndex tasks.CategoryIndex,
) Controller {
return &ControllerImpl{
status: common.DaemonStatusInitialized,
Expand Down Expand Up @@ -102,6 +102,6 @@ func ControllerProvider(
hostInfoProvider: hostInfoProvider,
engineFactory: engineFactory,
tracer: tracerProvider.Tracer(consts.LibraryName),
taskCategoryRegistry: taskCategoryRegistry,
taskCategoryIndex: taskCategoryIndex,
}
}
51 changes: 51 additions & 0 deletions service/history/tasks/category_index.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
// The MIT License
//
// Copyright (c) 2020 Temporal Technologies Inc. All rights reserved.
//
// Copyright (c) 2020 Uber Technologies, Inc.
//
// Permission is hereby granted, free of charge, to any person obtaining a copy
// of this software and associated documentation files (the "Software"), to deal
// in the Software without restriction, including without limitation the rights
// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
// copies of the Software, and to permit persons to whom the Software is
// furnished to do so, subject to the following conditions:
//
// The above copyright notice and this permission notice shall be included in
// all copies or substantial portions of the Software.
//
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
// THE SOFTWARE.

package tasks

import (
"golang.org/x/exp/maps"
)

// CategoryIndex is an immutable index of all registered Categories
type CategoryIndex interface {
// GetCategories returns a deep copy of all registered Categories
GetCategories() map[int32]Category
// GetCategoryByID returns a registered Category with the same ID
// It returns a bool indicating whether the Category is found
GetCategoryByID(id int32) (Category, bool)
}

type categoryIndex struct {
categories map[int32]Category
}

func (r *categoryIndex) GetCategories() map[int32]Category {
return maps.Clone(r.categories)
}

func (r *categoryIndex) GetCategoryByID(id int32) (Category, bool) {
category, ok := r.categories[id]
return category, ok
}

0 comments on commit 61a16f8

Please sign in to comment.