Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Create a task category registry object #4953

Merged
merged 1 commit into from Oct 18, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
711 changes: 348 additions & 363 deletions api/historyservice/v1/request_response.pb.go

Large diffs are not rendered by default.

9 changes: 5 additions & 4 deletions client/history/historytest/clienttest.go
Expand Up @@ -97,7 +97,7 @@ func TestClient(t *testing.T, historyTaskQueueManager persistence.HistoryTaskQue
require.NoError(t, err)
enqueueTasks(t, historyTaskQueueManager, 2, queueKey.SourceCluster, queueKey.TargetCluster)
dlqKey := &commonspb.HistoryDLQKey{
TaskCategory: tasks.CategoryTransfer.ID(),
TaskCategory: int32(tasks.CategoryTransfer.ID()),
SourceCluster: queueKey.SourceCluster,
TargetCluster: queueKey.TargetCluster,
}
Expand Down Expand Up @@ -139,7 +139,7 @@ func readTasks(
for i := 0; i < numTasks; i++ {
res, err := client.GetDLQTasks(context.Background(), &historyservice.GetDLQTasksRequest{
DlqKey: &commonspb.HistoryDLQKey{
TaskCategory: tasks.CategoryTransfer.ID(),
TaskCategory: int32(tasks.CategoryTransfer.ID()),
SourceCluster: sourceCluster,
TargetCluster: targetCluster,
},
Expand All @@ -156,8 +156,9 @@ func readTasks(
func createServer(historyTaskQueueManager persistence.HistoryTaskQueueManager) *grpc.Server {
// TODO: find a better way to create a history handler
historyHandler := historyserver.HandlerProvider(historyserver.NewHandlerArgs{
TaskQueueManager: historyTaskQueueManager,
TracerProvider: fakeTracerProvider{},
TaskQueueManager: historyTaskQueueManager,
TracerProvider: fakeTracerProvider{},
TaskCategoryRegistry: tasks.NewDefaultTaskCategoryRegistry(),
})
grpcServer := grpc.NewServer()
historyservice.RegisterHistoryServiceServer(grpcServer, historyHandler)
Expand Down
3 changes: 2 additions & 1 deletion cmd/tools/tdbg/main.go
Expand Up @@ -27,10 +27,11 @@ package main
import (
"os"

"go.temporal.io/server/service/history/tasks"
"go.temporal.io/server/tools/tdbg"
)

func main() {
app := tdbg.NewCliApp(tdbg.NewClientFactory())
app := tdbg.NewCliApp(tdbg.NewClientFactory(), tasks.NewDefaultTaskCategoryRegistry())
_ = app.Run(os.Args)
}
12 changes: 6 additions & 6 deletions common/persistence/sql/execution_tasks.go
Expand Up @@ -146,7 +146,7 @@ func (m *sqlExecutionStore) getHistoryImmediateTasks(

rows, err := m.Db.RangeSelectFromHistoryImmediateTasks(ctx, sqlplugin.HistoryImmediateTasksRangeFilter{
ShardID: request.ShardID,
CategoryID: categoryID,
CategoryID: int32(categoryID),
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: not sure if int can be used in sqlplugin interface as well.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That should be fine. We can convert to/from int32, int64 wherever category ids are serialized or deserialized (e.g. protos / DB objects). However, outside of that, there's no reason for it to be anything other than an int.

InclusiveMinTaskID: inclusiveMinTaskID,
ExclusiveMaxTaskID: exclusiveMaxTaskID,
PageSize: request.BatchSize,
Expand Down Expand Up @@ -200,7 +200,7 @@ func (m *sqlExecutionStore) completeHistoryImmediateTask(

if _, err := m.Db.DeleteFromHistoryImmediateTasks(ctx, sqlplugin.HistoryImmediateTasksFilter{
ShardID: request.ShardID,
CategoryID: categoryID,
CategoryID: int32(categoryID),
TaskID: request.TaskKey.TaskID,
}); err != nil {
return serviceerror.NewUnavailable(
Expand Down Expand Up @@ -229,7 +229,7 @@ func (m *sqlExecutionStore) rangeCompleteHistoryImmediateTasks(

if _, err := m.Db.RangeDeleteFromHistoryImmediateTasks(ctx, sqlplugin.HistoryImmediateTasksRangeFilter{
ShardID: request.ShardID,
CategoryID: categoryID,
CategoryID: int32(categoryID),
InclusiveMinTaskID: request.InclusiveMinTaskKey.TaskID,
ExclusiveMaxTaskID: request.ExclusiveMaxTaskKey.TaskID,
}); err != nil {
Expand Down Expand Up @@ -263,7 +263,7 @@ func (m *sqlExecutionStore) getHistoryScheduledTasks(

rows, err := m.Db.RangeSelectFromHistoryScheduledTasks(ctx, sqlplugin.HistoryScheduledTasksRangeFilter{
ShardID: request.ShardID,
CategoryID: categoryID,
CategoryID: int32(categoryID),
InclusiveMinVisibilityTimestamp: pageToken.Timestamp,
InclusiveMinTaskID: pageToken.TaskID,
ExclusiveMaxVisibilityTimestamp: request.ExclusiveMaxTaskKey.FireTime,
Expand Down Expand Up @@ -313,7 +313,7 @@ func (m *sqlExecutionStore) completeHistoryScheduledTask(

if _, err := m.Db.DeleteFromHistoryScheduledTasks(ctx, sqlplugin.HistoryScheduledTasksFilter{
ShardID: request.ShardID,
CategoryID: categoryID,
CategoryID: int32(categoryID),
VisibilityTimestamp: request.TaskKey.FireTime,
TaskID: request.TaskKey.TaskID,
}); err != nil {
Expand All @@ -338,7 +338,7 @@ func (m *sqlExecutionStore) rangeCompleteHistoryScheduledTasks(
end := request.ExclusiveMaxTaskKey.FireTime
if _, err := m.Db.RangeDeleteFromHistoryScheduledTasks(ctx, sqlplugin.HistoryScheduledTasksRangeFilter{
ShardID: request.ShardID,
CategoryID: categoryID,
CategoryID: int32(categoryID),
InclusiveMinVisibilityTimestamp: start,
ExclusiveMaxVisibilityTimestamp: end,
}); err != nil {
Expand Down
8 changes: 4 additions & 4 deletions common/persistence/sql/execution_util.go
Expand Up @@ -698,7 +698,7 @@ func createImmediateTasks(
ctx context.Context,
tx sqlplugin.Tx,
shardID int32,
categoryID int32,
categoryID int,
immedidateTasks []p.InternalHistoryTask,
) error {
// This is for backward compatiblity.
Expand All @@ -721,7 +721,7 @@ func createImmediateTasks(
for _, task := range immedidateTasks {
immediateTasksRows = append(immediateTasksRows, sqlplugin.HistoryImmediateTasksRow{
ShardID: shardID,
CategoryID: categoryID,
CategoryID: int32(categoryID),
TaskID: task.Key.TaskID,
Data: task.Blob.Data,
DataEncoding: task.Blob.EncodingType.String(),
Expand All @@ -748,7 +748,7 @@ func createScheduledTasks(
ctx context.Context,
tx sqlplugin.Tx,
shardID int32,
categoryID int32,
categoryID int,
scheduledTasks []p.InternalHistoryTask,
) error {
// This is for backward compatiblity.
Expand All @@ -766,7 +766,7 @@ func createScheduledTasks(
for _, task := range scheduledTasks {
scheduledTasksRows = append(scheduledTasksRows, sqlplugin.HistoryScheduledTasksRow{
ShardID: shardID,
CategoryID: categoryID,
CategoryID: int32(categoryID),
VisibilityTimestamp: task.Key.FireTime,
TaskID: task.Key.TaskID,
Data: task.Blob.Data,
Expand Down
Expand Up @@ -778,9 +778,10 @@ message AddTasksRequest {
int32 shard_id = 1;

message Task {
// category is needed to deserialize the tasks. Examples include "transfer", "timer", etc. See the history/tasks
// package for a definitive list. Warning: this is not the same as the stringified value of a TaskCategory enum.
string category = 1;
// category_id is needed to deserialize the tasks. See TaskCategory for a list of options here. However, keep in mind
// that the list of valid options is registered dynamically with the server in the history/tasks package, so that
// enum is not comprehensive.
int32 category_id = 1;
// blob is the serialized task.
temporal.api.common.v1.DataBlob blob = 2;
}
Expand Down
7 changes: 6 additions & 1 deletion service/frontend/admin_handler.go
Expand Up @@ -122,6 +122,8 @@ type (

// DEPRECATED
persistenceExecutionManager persistence.ExecutionManager

taskCategoryRegistry tasks.TaskCategoryRegistry
}

NewAdminHandlerArgs struct {
Expand Down Expand Up @@ -152,6 +154,8 @@ type (

// DEPRECATED
PersistenceExecutionManager persistence.ExecutionManager

CategoryRegistry tasks.TaskCategoryRegistry
}
)

Expand Down Expand Up @@ -201,6 +205,7 @@ func NewAdminHandler(
saManager: args.SaManager,
clusterMetadata: args.ClusterMetadata,
healthServer: args.HealthServer,
taskCategoryRegistry: args.CategoryRegistry,
}
}

Expand Down Expand Up @@ -793,7 +798,7 @@ func (adh *AdminHandler) ListHistoryTasks(
return nil, errTaskRangeNotSet
}

taskCategory, ok := tasks.GetCategoryByID(int32(request.Category))
taskCategory, ok := adh.taskCategoryRegistry.GetCategoryByID(int(request.Category))
if !ok {
return nil, &serviceerror.InvalidArgument{
Message: fmt.Sprintf("unknown task category: %v", request.Category),
Expand Down
5 changes: 3 additions & 2 deletions service/frontend/admin_handler_test.go
Expand Up @@ -170,6 +170,7 @@ func (s *adminHandlerSuite) SetupTest() {
serialization.NewSerializer(),
clock.NewRealTimeSource(),
s.mockResource.GetExecutionManager(),
tasks.NewDefaultTaskCategoryRegistry(),
}
s.mockMetadata.EXPECT().GetCurrentClusterName().Return(uuid.New()).AnyTimes()
s.handler = NewAdminHandler(args)
Expand Down Expand Up @@ -1160,7 +1161,7 @@ func (s *adminHandlerSuite) TestGetDLQTasks() {
blob := &commonpb.DataBlob{}
expectation := s.mockHistoryClient.EXPECT().GetDLQTasks(gomock.Any(), &historyservice.GetDLQTasksRequest{
DlqKey: &commonspb.HistoryDLQKey{
TaskCategory: tasks.CategoryTransfer.ID(),
TaskCategory: int32(tasks.CategoryTransfer.ID()),
SourceCluster: "test-source-cluster",
TargetCluster: "test-target-cluster",
},
Expand All @@ -1187,7 +1188,7 @@ func (s *adminHandlerSuite) TestGetDLQTasks() {
}
response, err := s.handler.GetDLQTasks(context.Background(), &adminservice.GetDLQTasksRequest{
DlqKey: &commonspb.HistoryDLQKey{
TaskCategory: tasks.CategoryTransfer.ID(),
TaskCategory: int32(tasks.CategoryTransfer.ID()),
SourceCluster: "test-source-cluster",
TargetCluster: "test-target-cluster",
},
Expand Down
3 changes: 3 additions & 0 deletions service/frontend/fx.go
Expand Up @@ -65,6 +65,7 @@ import (
"go.temporal.io/server/common/telemetry"
"go.temporal.io/server/service"
"go.temporal.io/server/service/frontend/configs"
"go.temporal.io/server/service/history/tasks"
)

type FEReplicatorNamespaceReplicationQueue persistence.NamespaceReplicationQueue
Expand Down Expand Up @@ -503,6 +504,7 @@ func AdminHandlerProvider(
healthServer *health.Server,
eventSerializer serialization.Serializer,
timeSource clock.TimeSource,
taskCategoryRegistry tasks.TaskCategoryRegistry,
) *AdminHandler {
args := NewAdminHandlerArgs{
persistenceConfig,
Expand Down Expand Up @@ -530,6 +532,7 @@ func AdminHandlerProvider(
eventSerializer,
timeSource,
persistenceExecutionManager,
taskCategoryRegistry,
}
return NewAdminHandler(args)
}
Expand Down
22 changes: 5 additions & 17 deletions service/history/api/addtasks/api.go
Expand Up @@ -34,6 +34,7 @@ import (
"go.temporal.io/server/api/historyservice/v1"
"go.temporal.io/server/common/definition"
"go.temporal.io/server/common/persistence"
"go.temporal.io/server/service/history/api"
"go.temporal.io/server/service/history/shard"
"go.temporal.io/server/service/history/tasks"
)
Expand Down Expand Up @@ -63,6 +64,7 @@ func Invoke(
deserializer TaskDeserializer,
numShards int,
req *historyservice.AddTasksRequest,
taskRegistry tasks.TaskCategoryRegistry,
) (*historyservice.AddTasksResponse, error) {
if len(req.Tasks) > maxTasksPerRequest {
return nil, serviceerror.NewInvalidArgument(fmt.Sprintf(
Expand All @@ -83,12 +85,9 @@ func Invoke(
return nil, serviceerror.NewInvalidArgument(fmt.Sprintf("Nil task at index: %d", i))
}

category, ok := getCategoryByName(task.Category)
if !ok {
return nil, serviceerror.NewInvalidArgument(fmt.Sprintf(
"Invalid task category: %s",
task.Category,
))
category, err := api.GetTaskCategory(int(task.CategoryId), taskRegistry)
if err != nil {
return nil, err
}

if task.Blob == nil {
Expand Down Expand Up @@ -139,14 +138,3 @@ func Invoke(

return &historyservice.AddTasksResponse{}, nil
}

func getCategoryByName(categoryName string) (tasks.Category, bool) {
categories := tasks.GetCategories()
for _, category := range categories {
if category.Name() == categoryName {
return category, true
}
}

return tasks.Category{}, false
}
13 changes: 7 additions & 6 deletions service/history/api/addtasks/api_test.go
Expand Up @@ -127,8 +127,8 @@ func TestInvoke(t *testing.T) {
blob, err := serializer.SerializeTask(task)
require.NoError(t, err)
params.req.Tasks = append(params.req.Tasks, &historyservice.AddTasksRequest_Task{
Category: task.GetCategory().Name(),
Blob: &blob,
CategoryId: int32(task.GetCategory().ID()),
Blob: &blob,
})
}
}
Expand Down Expand Up @@ -185,11 +185,11 @@ func TestInvoke(t *testing.T) {
{
name: "invalid task category",
configure: func(t *testing.T, params *testParams) {
params.req.Tasks[0].Category = "my-invalid-task-category"
params.req.Tasks[0].CategoryId = -1
params.expectation = func(resp *historyservice.AddTasksResponse, err error) {
require.ErrorAs(t, err, new(*serviceerror.InvalidArgument))
assert.ErrorContains(t, err, "Invalid task category")
assert.ErrorContains(t, err, "my-invalid-task-category")
assert.ErrorContains(t, err, "-1")
}
},
},
Expand Down Expand Up @@ -251,6 +251,7 @@ func TestInvoke(t *testing.T) {
params.deserializer,
params.numShards,
params.req,
tasks.NewDefaultTaskCategoryRegistry(),
)
params.expectation(resp, err)
})
Expand All @@ -276,8 +277,8 @@ func getDefaultTestParams(t *testing.T) *testParams {
ShardId: 1,
Tasks: []*historyservice.AddTasksRequest_Task{
{
Category: tasks.CategoryTransfer.Name(),
Blob: &blob,
CategoryId: int32(tasks.CategoryTransfer.ID()),
Blob: &blob,
},
},
},
Expand Down
4 changes: 3 additions & 1 deletion service/history/api/deletedlqtasks/api.go
Expand Up @@ -32,14 +32,16 @@ import (
"go.temporal.io/server/api/historyservice/v1"
"go.temporal.io/server/common/persistence"
"go.temporal.io/server/service/history/api"
"go.temporal.io/server/service/history/tasks"
)

func Invoke(
ctx context.Context,
historyTaskQueueManager persistence.HistoryTaskQueueManager,
req *historyservice.DeleteDLQTasksRequest,
registry tasks.TaskCategoryRegistry,
) (*historyservice.DeleteDLQTasksResponse, error) {
category, err := api.GetTaskCategory(int(req.DlqKey.TaskCategory))
category, err := api.GetTaskCategory(int(req.DlqKey.TaskCategory), registry)
if err != nil {
return nil, err
}
Expand Down
7 changes: 4 additions & 3 deletions service/history/api/deletedlqtasks/api_test.go
Expand Up @@ -38,6 +38,7 @@ import (
"go.temporal.io/server/common/persistence"
"go.temporal.io/server/common/persistence/persistencetest"
"go.temporal.io/server/service/history/api/deletedlqtasks"
"go.temporal.io/server/service/history/tasks"
)

func TestInvoke_InvalidCategory(t *testing.T) {
Expand All @@ -50,7 +51,7 @@ func TestInvoke_InvalidCategory(t *testing.T) {
SourceCluster: queueKey.SourceCluster,
TargetCluster: queueKey.TargetCluster,
},
})
}, tasks.NewDefaultTaskCategoryRegistry())
require.Error(t, err)
assert.Equal(t, codes.InvalidArgument, serviceerror.ToStatus(err).Code())
assert.ErrorContains(t, err, "-1")
Expand All @@ -62,11 +63,11 @@ func TestInvoke_ErrDeleteMissingMessageIDUpperBound(t *testing.T) {
queueKey := persistencetest.GetQueueKey(t, persistencetest.WithQueueType(persistence.QueueTypeHistoryDLQ))
_, err := deletedlqtasks.Invoke(context.Background(), nil, &historyservice.DeleteDLQTasksRequest{
DlqKey: &commonspb.HistoryDLQKey{
TaskCategory: queueKey.Category.ID(),
TaskCategory: int32(queueKey.Category.ID()),
SourceCluster: queueKey.SourceCluster,
TargetCluster: queueKey.TargetCluster,
},
})
}, tasks.NewDefaultTaskCategoryRegistry())
require.Error(t, err)
assert.Equal(t, codes.InvalidArgument, serviceerror.ToStatus(err).Code())
assert.ErrorContains(t, err, "inclusive_max_task_metadata")
Expand Down