Skip to content

Commit

Permalink
Provide HistoryTaskQueueManager from persistence module
Browse files Browse the repository at this point in the history
  • Loading branch information
MichaelSnowden committed Sep 29, 2023
1 parent a9249b9 commit 194d169
Show file tree
Hide file tree
Showing 5 changed files with 106 additions and 24 deletions.
19 changes: 12 additions & 7 deletions common/persistence/client/bean_fx.go
Expand Up @@ -39,13 +39,14 @@ var BeanModule = fx.Options(
fx.Invoke(BeanLifetimeHooks),
)

var BeanDepsModule = fx.Options(
fx.Provide(ClusterMetadataManagerProvider),
fx.Provide(MetadataManagerProvider),
fx.Provide(TaskManagerProvider),
fx.Provide(NamespaceReplicationQueueProvider),
fx.Provide(ShardManagerProvider),
fx.Provide(ExecutionManagerProvider),
var BeanDepsModule = fx.Provide(
ClusterMetadataManagerProvider,
MetadataManagerProvider,
TaskManagerProvider,
NamespaceReplicationQueueProvider,
ShardManagerProvider,
ExecutionManagerProvider,
HistoryTaskQueueManagerProvider,
)

func BeanProvider(
Expand Down Expand Up @@ -89,6 +90,10 @@ func ExecutionManagerProvider(factory Factory) (persistence.ExecutionManager, er
return factory.NewExecutionManager()
}

func HistoryTaskQueueManagerProvider(factory Factory) (persistence.HistoryTaskQueueManager, error) {
return factory.NewHistoryTaskQueueManager()
}

func BeanLifetimeHooks(
lc fx.Lifecycle,
bean Bean,
Expand Down
10 changes: 10 additions & 0 deletions common/persistence/client/factory.go
Expand Up @@ -58,6 +58,8 @@ type (
NewNamespaceReplicationQueue() (p.NamespaceReplicationQueue, error)
// NewClusterMetadataManager returns a new manager for cluster specific metadata
NewClusterMetadataManager() (p.ClusterMetadataManager, error)
// NewHistoryTaskQueueManager returns a new manager for history task queues
NewHistoryTaskQueueManager() (p.HistoryTaskQueueManager, error)
}

factoryImpl struct {
Expand Down Expand Up @@ -212,6 +214,14 @@ func (f *factoryImpl) NewNamespaceReplicationQueue() (p.NamespaceReplicationQueu
return p.NewNamespaceReplicationQueue(result, f.serializer, f.clusterName, f.metricsHandler, f.logger)
}

func (f *factoryImpl) NewHistoryTaskQueueManager() (p.HistoryTaskQueueManager, error) {
q, err := f.dataStoreFactory.NewQueueV2()
if err != nil {
return nil, err
}
return p.NewHistoryTaskQueueManager(q, int(f.config.NumHistoryShards)), nil
}

// Close closes this factory
func (f *factoryImpl) Close() {
f.dataStoreFactory.Close()
Expand Down
81 changes: 81 additions & 0 deletions common/persistence/client/factory_test.go
@@ -0,0 +1,81 @@
// The MIT License
//
// Copyright (c) 2020 Temporal Technologies Inc. All rights reserved.
//
// Copyright (c) 2020 Uber Technologies, Inc.
//
// Permission is hereby granted, free of charge, to any person obtaining a copy
// of this software and associated documentation files (the "Software"), to deal
// in the Software without restriction, including without limitation the rights
// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
// copies of the Software, and to permit persons to whom the Software is
// furnished to do so, subject to the following conditions:
//
// The above copyright notice and this permission notice shall be included in
// all copies or substantial portions of the Software.
//
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
// THE SOFTWARE.

package client_test

import (
"errors"
"testing"

"github.com/stretchr/testify/require"
"go.temporal.io/server/common/config"
"go.temporal.io/server/common/persistence/client"
)

func TestFactoryImpl_NewHistoryTaskQueueManager(t *testing.T) {
t.Parallel()

for _, tc := range []struct {
name string
err error
}{
{
name: "No error",
err: nil,
},
{
name: "Error",
err: errors.New("some error"),
},
} {
tc := tc
t.Run(tc.name, func(t *testing.T) {
t.Parallel()

dataStoreFactory := &testDataStoreFactory{
err: tc.err,
}
factory := client.NewFactory(
dataStoreFactory,
&config.Persistence{
NumHistoryShards: 1,
},
nil,
nil,
nil,
"",
nil,
nil,
nil,
)
historyTaskQueueManager, err := factory.NewHistoryTaskQueueManager()
if tc.err != nil {
require.Error(t, err)
return
}
require.NoError(t, err)
require.NotNil(t, historyTaskQueueManager)
})
}
}
7 changes: 3 additions & 4 deletions common/persistence/data_interfaces.go
Expand Up @@ -1205,10 +1205,9 @@ type (
DeleteClusterMetadata(ctx context.Context, request *DeleteClusterMetadataRequest) error
}

// HistoryTaskQueueManager is responsible for managing a queue of internal history tasks. It is currently unused,
// but we plan on using this to implement a DLQ for history tasks. This is called a history task queue manager, but
// the actual history task queues are not managed by this object. Instead, this object is responsible for managing
// a generic queue of history tasks (which is what the history task DLQ will be).
// HistoryTaskQueueManager is responsible for managing a queue of internal history tasks. This is called a history
// task queue manager, but the actual history task queues are not managed by this object. Instead, this object is
// responsible for managing a generic queue of history tasks (which is what the history task DLQ is).
HistoryTaskQueueManager interface {
EnqueueTask(ctx context.Context, request *EnqueueTaskRequest) (*EnqueueTaskResponse, error)
ReadRawTasks(
Expand Down
13 changes: 0 additions & 13 deletions service/history/fx.go
Expand Up @@ -39,7 +39,6 @@ import (
"go.temporal.io/server/common/membership"
"go.temporal.io/server/common/metrics"
"go.temporal.io/server/common/namespace"
"go.temporal.io/server/common/persistence"
persistenceClient "go.temporal.io/server/common/persistence/client"
"go.temporal.io/server/common/persistence/visibility"
"go.temporal.io/server/common/persistence/visibility/manager"
Expand Down Expand Up @@ -83,7 +82,6 @@ var Module = fx.Options(
fx.Provide(HistoryEngineFactoryProvider),
fx.Provide(HandlerProvider),
fx.Provide(ServiceProvider),
fx.Provide(TaskQueueManagerProvider), // TODO: provide via persistence factory module
fx.Invoke(ServiceLifetimeHooks),
)

Expand Down Expand Up @@ -281,14 +279,3 @@ func EventNotifierProvider(
func ServiceLifetimeHooks(lc fx.Lifecycle, svc *Service) {
lc.Append(fx.StartStopHook(svc.Start, svc.Stop))
}

func TaskQueueManagerProvider(
cfg *configs.Config,
factory persistenceClient.DataStoreFactory,
) (persistence.HistoryTaskQueueManager, error) {
queueV2, err := factory.NewQueueV2()
if err != nil {
return nil, err
}
return persistence.NewHistoryTaskQueueManager(queueV2, int(cfg.NumberOfShards)), err
}

0 comments on commit 194d169

Please sign in to comment.