Skip to content

Commit

Permalink
Create a task category registry object
Browse files Browse the repository at this point in the history
  • Loading branch information
MichaelSnowden committed Oct 10, 2023
1 parent 6a765ec commit 007d25a
Show file tree
Hide file tree
Showing 31 changed files with 340 additions and 202 deletions.
5 changes: 3 additions & 2 deletions client/history/historytest/clienttest.go
Expand Up @@ -156,8 +156,9 @@ func readTasks(
func createServer(historyTaskQueueManager persistence.HistoryTaskQueueManager) *grpc.Server {
// TODO: find a better way to create a history handler
historyHandler := historyserver.HandlerProvider(historyserver.NewHandlerArgs{
TaskQueueManager: historyTaskQueueManager,
TracerProvider: fakeTracerProvider{},
TaskQueueManager: historyTaskQueueManager,
TracerProvider: fakeTracerProvider{},
TaskCategoryRegistry: tasks.NewDefaultTaskCategoryRegistry(),
})
grpcServer := grpc.NewServer()
historyservice.RegisterHistoryServiceServer(grpcServer, historyHandler)
Expand Down
12 changes: 6 additions & 6 deletions common/persistence/sql/execution_tasks.go
Expand Up @@ -146,7 +146,7 @@ func (m *sqlExecutionStore) getHistoryImmediateTasks(

rows, err := m.Db.RangeSelectFromHistoryImmediateTasks(ctx, sqlplugin.HistoryImmediateTasksRangeFilter{
ShardID: request.ShardID,
CategoryID: categoryID,
CategoryID: int32(categoryID),
InclusiveMinTaskID: inclusiveMinTaskID,
ExclusiveMaxTaskID: exclusiveMaxTaskID,
PageSize: request.BatchSize,
Expand Down Expand Up @@ -200,7 +200,7 @@ func (m *sqlExecutionStore) completeHistoryImmediateTask(

if _, err := m.Db.DeleteFromHistoryImmediateTasks(ctx, sqlplugin.HistoryImmediateTasksFilter{
ShardID: request.ShardID,
CategoryID: categoryID,
CategoryID: int32(categoryID),
TaskID: request.TaskKey.TaskID,
}); err != nil {
return serviceerror.NewUnavailable(
Expand Down Expand Up @@ -229,7 +229,7 @@ func (m *sqlExecutionStore) rangeCompleteHistoryImmediateTasks(

if _, err := m.Db.RangeDeleteFromHistoryImmediateTasks(ctx, sqlplugin.HistoryImmediateTasksRangeFilter{
ShardID: request.ShardID,
CategoryID: categoryID,
CategoryID: int32(categoryID),
InclusiveMinTaskID: request.InclusiveMinTaskKey.TaskID,
ExclusiveMaxTaskID: request.ExclusiveMaxTaskKey.TaskID,
}); err != nil {
Expand Down Expand Up @@ -263,7 +263,7 @@ func (m *sqlExecutionStore) getHistoryScheduledTasks(

rows, err := m.Db.RangeSelectFromHistoryScheduledTasks(ctx, sqlplugin.HistoryScheduledTasksRangeFilter{
ShardID: request.ShardID,
CategoryID: categoryID,
CategoryID: int32(categoryID),
InclusiveMinVisibilityTimestamp: pageToken.Timestamp,
InclusiveMinTaskID: pageToken.TaskID,
ExclusiveMaxVisibilityTimestamp: request.ExclusiveMaxTaskKey.FireTime,
Expand Down Expand Up @@ -313,7 +313,7 @@ func (m *sqlExecutionStore) completeHistoryScheduledTask(

if _, err := m.Db.DeleteFromHistoryScheduledTasks(ctx, sqlplugin.HistoryScheduledTasksFilter{
ShardID: request.ShardID,
CategoryID: categoryID,
CategoryID: int32(categoryID),
VisibilityTimestamp: request.TaskKey.FireTime,
TaskID: request.TaskKey.TaskID,
}); err != nil {
Expand All @@ -338,7 +338,7 @@ func (m *sqlExecutionStore) rangeCompleteHistoryScheduledTasks(
end := request.ExclusiveMaxTaskKey.FireTime
if _, err := m.Db.RangeDeleteFromHistoryScheduledTasks(ctx, sqlplugin.HistoryScheduledTasksRangeFilter{
ShardID: request.ShardID,
CategoryID: categoryID,
CategoryID: int32(categoryID),
InclusiveMinVisibilityTimestamp: start,
ExclusiveMaxVisibilityTimestamp: end,
}); err != nil {
Expand Down
8 changes: 4 additions & 4 deletions common/persistence/sql/execution_util.go
Expand Up @@ -698,7 +698,7 @@ func createImmediateTasks(
ctx context.Context,
tx sqlplugin.Tx,
shardID int32,
categoryID int32,
categoryID int,
immedidateTasks []p.InternalHistoryTask,
) error {
// This is for backward compatiblity.
Expand All @@ -721,7 +721,7 @@ func createImmediateTasks(
for _, task := range immedidateTasks {
immediateTasksRows = append(immediateTasksRows, sqlplugin.HistoryImmediateTasksRow{
ShardID: shardID,
CategoryID: categoryID,
CategoryID: int32(categoryID),
TaskID: task.Key.TaskID,
Data: task.Blob.Data,
DataEncoding: task.Blob.EncodingType.String(),
Expand All @@ -748,7 +748,7 @@ func createScheduledTasks(
ctx context.Context,
tx sqlplugin.Tx,
shardID int32,
categoryID int32,
categoryID int,
scheduledTasks []p.InternalHistoryTask,
) error {
// This is for backward compatiblity.
Expand All @@ -766,7 +766,7 @@ func createScheduledTasks(
for _, task := range scheduledTasks {
scheduledTasksRows = append(scheduledTasksRows, sqlplugin.HistoryScheduledTasksRow{
ShardID: shardID,
CategoryID: categoryID,
CategoryID: int32(categoryID),
VisibilityTimestamp: task.Key.FireTime,
TaskID: task.Key.TaskID,
Data: task.Blob.Data,
Expand Down
7 changes: 6 additions & 1 deletion service/frontend/admin_handler.go
Expand Up @@ -123,6 +123,8 @@ type (

// DEPRECATED
persistenceExecutionManager persistence.ExecutionManager

taskCategoryRegistry tasks.TaskCategoryRegistry
}

NewAdminHandlerArgs struct {
Expand Down Expand Up @@ -153,6 +155,8 @@ type (

// DEPRECATED
PersistenceExecutionManager persistence.ExecutionManager

CategoryRegistry tasks.TaskCategoryRegistry
}
)

Expand Down Expand Up @@ -202,6 +206,7 @@ func NewAdminHandler(
saManager: args.SaManager,
clusterMetadata: args.ClusterMetadata,
healthServer: args.HealthServer,
taskCategoryRegistry: args.CategoryRegistry,
}
}

Expand Down Expand Up @@ -794,7 +799,7 @@ func (adh *AdminHandler) ListHistoryTasks(
return nil, errTaskRangeNotSet
}

taskCategory, ok := tasks.GetCategoryByID(int32(request.Category))
taskCategory, ok := adh.taskCategoryRegistry.GetCategoryByID(int(request.Category))
if !ok {
return nil, &serviceerror.InvalidArgument{
Message: fmt.Sprintf("unknown task category: %v", request.Category),
Expand Down
4 changes: 4 additions & 0 deletions service/frontend/admin_handler_test.go
Expand Up @@ -34,7 +34,10 @@ import (

commonpb "go.temporal.io/api/common/v1"
namespacepb "go.temporal.io/api/namespace/v1"

"go.temporal.io/server/api/enums/v1"
"go.temporal.io/server/service/history/tasks"

"google.golang.org/grpc/metadata"

historyclient "go.temporal.io/server/client/history"
Expand Down Expand Up @@ -167,6 +170,7 @@ func (s *adminHandlerSuite) SetupTest() {
serialization.NewSerializer(),
clock.NewRealTimeSource(),
s.mockResource.GetExecutionManager(),
tasks.NewDefaultTaskCategoryRegistry(),
}
s.mockMetadata.EXPECT().GetCurrentClusterName().Return(uuid.New()).AnyTimes()
s.handler = NewAdminHandler(args)
Expand Down
7 changes: 6 additions & 1 deletion service/frontend/fx.go
Expand Up @@ -65,9 +65,12 @@ import (
"go.temporal.io/server/common/telemetry"
"go.temporal.io/server/service"
"go.temporal.io/server/service/frontend/configs"
"go.temporal.io/server/service/history/tasks"
)

type FEReplicatorNamespaceReplicationQueue persistence.NamespaceReplicationQueue
type (
FEReplicatorNamespaceReplicationQueue persistence.NamespaceReplicationQueue
)

var Module = fx.Options(
resource.Module,
Expand Down Expand Up @@ -503,6 +506,7 @@ func AdminHandlerProvider(
healthServer *health.Server,
eventSerializer serialization.Serializer,
timeSource clock.TimeSource,
taskCategoryRegistry tasks.TaskCategoryRegistry,
) *AdminHandler {
args := NewAdminHandlerArgs{
persistenceConfig,
Expand Down Expand Up @@ -530,6 +534,7 @@ func AdminHandlerProvider(
eventSerializer,
timeSource,
persistenceExecutionManager,
taskCategoryRegistry,
}
return NewAdminHandler(args)
}
Expand Down
4 changes: 3 additions & 1 deletion service/history/api/deletedlqtasks/api.go
Expand Up @@ -32,15 +32,17 @@ import (
"go.temporal.io/server/api/historyservice/v1"
"go.temporal.io/server/common/persistence"
"go.temporal.io/server/service/history/api"
"go.temporal.io/server/service/history/tasks"
)

func Invoke(
ctx context.Context,
historyTaskQueueManager persistence.HistoryTaskQueueManager,
req *historyservice.DeleteDLQTasksRequest,
registry tasks.TaskCategoryRegistry,
) (*historyservice.DeleteDLQTasksResponse, error) {
categoryEnum := req.DlqKey.Category
category, err := api.GetTaskCategory(categoryEnum)
category, err := api.GetTaskCategory(categoryEnum, registry)
if err != nil {
return nil, err
}
Expand Down
5 changes: 3 additions & 2 deletions service/history/api/deletedlqtasks/api_test.go
Expand Up @@ -38,6 +38,7 @@ import (
"go.temporal.io/server/common/persistence"
"go.temporal.io/server/common/persistence/persistencetest"
"go.temporal.io/server/service/history/api/deletedlqtasks"
"go.temporal.io/server/service/history/tasks"
)

func TestInvoke_InvalidCategory(t *testing.T) {
Expand All @@ -50,7 +51,7 @@ func TestInvoke_InvalidCategory(t *testing.T) {
SourceCluster: queueKey.SourceCluster,
TargetCluster: queueKey.TargetCluster,
},
})
}, tasks.NewDefaultTaskCategoryRegistry())
require.Error(t, err)
assert.Equal(t, codes.InvalidArgument, serviceerror.ToStatus(err).Code())
assert.ErrorContains(t, err, "Unspecified")
Expand All @@ -66,7 +67,7 @@ func TestInvoke_ErrDeleteMissingMessageIDUpperBound(t *testing.T) {
SourceCluster: queueKey.SourceCluster,
TargetCluster: queueKey.TargetCluster,
},
})
}, tasks.NewDefaultTaskCategoryRegistry())
require.Error(t, err)
assert.Equal(t, codes.InvalidArgument, serviceerror.ToStatus(err).Code())
assert.ErrorContains(t, err, "inclusive_max_task_metadata")
Expand Down
Expand Up @@ -69,7 +69,7 @@ func TestInvoke(t *testing.T, manager persistence.HistoryTaskQueueManager) {
InclusiveMaxTaskMetadata: &historyservice.HistoryDLQTaskMetadata{
MessageId: persistence.FirstQueueMessageID + 1,
},
})
}, tasks.NewDefaultTaskCategoryRegistry())
require.NoError(t, err)
resp, err := manager.ReadRawTasks(ctx, &persistence.ReadRawTasksRequest{
QueueKey: queueKey,
Expand All @@ -92,7 +92,7 @@ func TestInvoke(t *testing.T, manager persistence.HistoryTaskQueueManager) {
InclusiveMaxTaskMetadata: &historyservice.HistoryDLQTaskMetadata{
MessageId: persistence.FirstQueueMessageID,
},
})
}, tasks.NewDefaultTaskCategoryRegistry())
require.Error(t, err)
assert.Equal(t, codes.NotFound, serviceerror.ToStatus(err).Code(), err.Error())
})
Expand Down
4 changes: 3 additions & 1 deletion service/history/api/getdlqtasks/api.go
Expand Up @@ -36,16 +36,18 @@ import (
"go.temporal.io/server/common/persistence"
"go.temporal.io/server/service/history/api"
"go.temporal.io/server/service/history/consts"
"go.temporal.io/server/service/history/tasks"
)

// Invoke the GetDLQTasks API. All errors returned from this function are already translated into the appropriate type
// from the [serviceerror] package.
func Invoke(
ctx context.Context,
historyTaskQueueManager persistence.HistoryTaskQueueManager,
taskCategoryRegistry tasks.TaskCategoryRegistry,
req *historyservice.GetDLQTasksRequest,
) (*historyservice.GetDLQTasksResponse, error) {
category, err := api.GetTaskCategory(req.DlqKey.Category)
category, err := api.GetTaskCategory(req.DlqKey.Category, taskCategoryRegistry)
if err != nil {
return nil, err
}
Expand Down
15 changes: 11 additions & 4 deletions service/history/api/getdlqtasks/api_test.go
Expand Up @@ -49,11 +49,16 @@ type failingHistoryTaskQueueManager struct {
func TestInvoke_InvalidQueueCategory(t *testing.T) {
t.Parallel()

_, err := getdlqtasks.Invoke(context.Background(), nil, &historyservice.GetDLQTasksRequest{
DlqKey: &historyservice.HistoryDLQKey{
Category: -1,
_, err := getdlqtasks.Invoke(
context.Background(),
nil,
tasks.NewDefaultTaskCategoryRegistry(),
&historyservice.GetDLQTasksRequest{
DlqKey: &historyservice.HistoryDLQKey{
Category: -1,
},
},
})
)

var invalidArgErr *serviceerror.InvalidArgument

Expand All @@ -68,6 +73,7 @@ func TestInvoke_ZeroPageSize(t *testing.T) {
_, err := getdlqtasks.Invoke(
context.Background(),
new(persistence.HistoryTaskQueueManagerImpl),
tasks.NewDefaultTaskCategoryRegistry(),
&historyservice.GetDLQTasksRequest{
DlqKey: &historyservice.HistoryDLQKey{
Category: enumsspb.TaskCategory(tasks.CategoryTransfer.ID()),
Expand All @@ -85,6 +91,7 @@ func TestInvoke_UnavailableError(t *testing.T) {
_, err := getdlqtasks.Invoke(
context.Background(),
failingHistoryTaskQueueManager{},
tasks.NewDefaultTaskCategoryRegistry(),
&historyservice.GetDLQTasksRequest{
DlqKey: &historyservice.HistoryDLQKey{
Category: enumsspb.TaskCategory(tasks.CategoryTransfer.ID()),
Expand Down
1 change: 1 addition & 0 deletions service/history/api/getdlqtasks/getdlqtaskstest/apitest.go
Expand Up @@ -68,6 +68,7 @@ func TestInvoke(t *testing.T, manager persistence.HistoryTaskQueueManager) {
res, err := getdlqtasks.Invoke(
context.Background(),
manager,
tasks.NewDefaultTaskCategoryRegistry(),
&historyservice.GetDLQTasksRequest{
DlqKey: &historyservice.HistoryDLQKey{
Category: enumsspb.TASK_CATEGORY_TRANSFER,
Expand Down
4 changes: 2 additions & 2 deletions service/history/api/task_category.go
Expand Up @@ -33,8 +33,8 @@ import (
"go.temporal.io/server/service/history/tasks"
)

func GetTaskCategory(categoryEnum enums.TaskCategory) (tasks.Category, error) {
category, ok := tasks.GetCategoryByID(int32(categoryEnum))
func GetTaskCategory(categoryEnum enums.TaskCategory, registry tasks.TaskCategoryRegistry) (tasks.Category, error) {
category, ok := registry.GetCategoryByID(int(categoryEnum))
if !ok {
return tasks.Category{}, serviceerror.NewInvalidArgument(
fmt.Sprintf("Invalid queue category: %v", categoryEnum.String()),
Expand Down
8 changes: 5 additions & 3 deletions service/history/api/task_category_test.go
Expand Up @@ -34,16 +34,18 @@ import (

enumspb "go.temporal.io/server/api/enums/v1"
"go.temporal.io/server/service/history/api"
"go.temporal.io/server/service/history/tasks"
)

func TestGetTaskCategory(t *testing.T) {
t.Parallel()

category, err := api.GetTaskCategory(enumspb.TASK_CATEGORY_TRANSFER)
registry := tasks.NewDefaultTaskCategoryRegistry()
category, err := api.GetTaskCategory(enumspb.TASK_CATEGORY_TRANSFER, registry)
require.NoError(t, err)
assert.Equal(t, int(enumspb.TASK_CATEGORY_TRANSFER), int(category.ID()))
assert.Equal(t, int(enumspb.TASK_CATEGORY_TRANSFER), category.ID())

_, err = api.GetTaskCategory(enumspb.TASK_CATEGORY_UNSPECIFIED)
_, err = api.GetTaskCategory(enumspb.TASK_CATEGORY_UNSPECIFIED, registry)
require.Error(t, err)
assert.ErrorContains(t, err, "Unspecified")
assert.Equal(t, codes.InvalidArgument, serviceerror.ToStatus(err).Code())
Expand Down
3 changes: 2 additions & 1 deletion service/history/archival_queue_factory_test.go
Expand Up @@ -30,6 +30,7 @@ import (
"github.com/golang/mock/gomock"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"

"go.temporal.io/server/common/clock"
"go.temporal.io/server/common/cluster"

Expand Down Expand Up @@ -60,7 +61,7 @@ func TestArchivalQueueFactory(t *testing.T) {
ShardId: 0,
RangeId: 1,
QueueStates: map[int32]*persistencespb.QueueState{
tasks.CategoryIDArchival: {
int32(tasks.CategoryIDArchival): {
ReaderStates: nil,
ExclusiveReaderHighWatermark: &persistencespb.TaskKey{
FireTime: timestamp.TimeNowPtrUtc(),
Expand Down
1 change: 1 addition & 0 deletions service/history/fx.go
Expand Up @@ -140,6 +140,7 @@ func HandlerProvider(args NewHandlerArgs) *Handler {
eventNotifier: args.EventNotifier,
tracer: args.TracerProvider.Tracer(consts.LibraryName),
taskQueueManager: args.TaskQueueManager,
taskCategoryRegistry: args.TaskCategoryRegistry,

replicationTaskFetcherFactory: args.ReplicationTaskFetcherFactory,
replicationTaskConverterProvider: args.ReplicationTaskConverterFactory,
Expand Down

0 comments on commit 007d25a

Please sign in to comment.