Skip to content

Commit

Permalink
Add a history task queue manager
Browse files Browse the repository at this point in the history
  • Loading branch information
MichaelSnowden committed Sep 11, 2023
1 parent 8846224 commit 172b65e
Show file tree
Hide file tree
Showing 9 changed files with 956 additions and 35 deletions.
346 changes: 313 additions & 33 deletions api/persistence/v1/queues.pb.go

Large diffs are not rendered by default.

48 changes: 48 additions & 0 deletions common/persistence/data_interfaces.go
Expand Up @@ -37,6 +37,7 @@ import (
commonpb "go.temporal.io/api/common/v1"
enumspb "go.temporal.io/api/enums/v1"
historypb "go.temporal.io/api/history/v1"
"go.temporal.io/server/common/persistence/serialization"

enumsspb "go.temporal.io/server/api/enums/v1"
persistencespb "go.temporal.io/server/api/persistence/v1"
Expand Down Expand Up @@ -1204,6 +1205,53 @@ type (
SaveClusterMetadata(ctx context.Context, request *SaveClusterMetadataRequest) (bool, error)
DeleteClusterMetadata(ctx context.Context, request *DeleteClusterMetadataRequest) error
}

// HistoryTaskQueueManager is responsible for managing a queue of internal history tasks. It is currently unused,
// but we plan on using this to implement a DLQ for history tasks. This is called a history task queue manager, but
// the actual history task queues are not managed by this object. Instead, this object is responsible for managing
// a generic queue of history tasks (which is what the history task DLQ will be).
// TODO: make this an interface and add wrapper classes like retryable client, metrics client, etc.
HistoryTaskQueueManager struct {
queue QueueV2
serializer *serialization.TaskSerializer
numHistoryShards int
}

// QueueKey identifies a history task queue. It is converted to a queue name using the GetQueueName method.
QueueKey struct {
QueueType QueueV2Type
Category tasks.Category
SourceCluster string
// TargetCluster is only used for cross-cluster replication tasks.
TargetCluster string
}

EnqueueTaskRequest struct {
QueueKey QueueKey
Task tasks.Task
}

EnqueueTaskResponse struct {
Metadata MessageMetadata
}

ReadTasksRequest struct {
QueueKey QueueKey
PageSize int
NextPageToken []byte
}

ReadTasksResponse struct {
Tasks []tasks.Task
NextPageToken []byte
}

ReadRawTasksRequest = ReadTasksRequest

ReadRawTasksResponse struct {
Tasks []persistencespb.HistoryTask
NextPageToken []byte
}
)

func (e *InvalidPersistenceRequestError) Error() string {
Expand Down
187 changes: 187 additions & 0 deletions common/persistence/history_task_queue_manager.go
@@ -0,0 +1,187 @@
// The MIT License
//
// Copyright (c) 2020 Temporal Technologies Inc. All rights reserved.
//
// Copyright (c) 2020 Uber Technologies, Inc.
//
// Permission is hereby granted, free of charge, to any person obtaining a copy
// of this software and associated documentation files (the "Software"), to deal
// in the Software without restriction, including without limitation the rights
// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
// copies of the Software, and to permit persons to whom the Software is
// furnished to do so, subject to the following conditions:
//
// The above copyright notice and this permission notice shall be included in
// all copies or substantial portions of the Software.
//
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
// THE SOFTWARE.

package persistence

import (
"context"
"crypto/sha256"
"encoding/base64"
"errors"
"fmt"

commonpb "go.temporal.io/api/common/v1"
"go.temporal.io/api/enums/v1"
persistencespb "go.temporal.io/server/api/persistence/v1"
"go.temporal.io/server/common/persistence/serialization"
"go.temporal.io/server/service/history/tasks"
)

const (
// clusterNamesHashSuffixLength is the number of characters to use from the hash of the cluster names when forming
// the queue name. This is used to avoid name collisions when a cluster name contains the separator character.
clusterNamesHashSuffixLength = 8

ErrMsgSerializeTaskToEnqueue = "failed to serialize history task for task queue"
// ErrMsgDeserializeRawHistoryTask is returned when the raw task cannot be deserialized from the task queue. This error
// is returned when this whole top-level proto cannot be deserialized.
// Raw Task (a proto): <-- when this cannot be deserialized
// - ShardID
// - Blob (a serialized task)
ErrMsgDeserializeRawHistoryTask = "failed to deserialize raw history task from task queue"
// ErrMsgDeserializeHistoryTask is returned when the history task cannot be deserialized from the task queue. This
// error is returned when the blob inside the raw task cannot be deserialized.
// Raw Task (a proto):
// - ShardID
// - Blob (a serialized task) <-- when this cannot be deserialized
ErrMsgDeserializeHistoryTask = "failed to deserialize history task blob"
)

var (
ErrReadTasksNonPositivePageSize = errors.New("page size to read history tasks must be positive")
ErrHistoryTaskBlobIsNil = errors.New("history task from queue has nil blob")
)

func NewTaskQueueManager(queue QueueV2, numHistoryShards int) *HistoryTaskQueueManager {
return &HistoryTaskQueueManager{
queue: queue,
serializer: serialization.NewTaskSerializer(),
numHistoryShards: numHistoryShards,
}
}

func (m *HistoryTaskQueueManager) EnqueueTask(ctx context.Context, request *EnqueueTaskRequest) (*EnqueueTaskResponse, error) {
blob, err := m.serializer.SerializeTask(request.Task)
if err != nil {
return nil, fmt.Errorf("%v: %w", ErrMsgSerializeTaskToEnqueue, err)
}

shardID := tasks.GetShardIDForTask(request.Task, m.numHistoryShards)
task := persistencespb.HistoryTask{
ShardId: int32(shardID),
Blob: &blob,
}
taskBytes, _ := task.Marshal()
blob = commonpb.DataBlob{
EncodingType: enums.ENCODING_TYPE_PROTO3,
Data: taskBytes,
}

message, err := m.queue.EnqueueMessage(ctx, &InternalEnqueueMessageRequest{
QueueType: request.QueueKey.QueueType,
QueueName: request.QueueKey.GetQueueName(),
Blob: blob,
})
if err != nil {
return nil, err
}

return &EnqueueTaskResponse{
Metadata: message.Metadata,
}, nil
}

// ReadRawTasks returns a page of "raw" tasks from the queue. Here's a quick disambiguation of the different types of
// tasks:
//
// - [go.temporal.io/server/api/history/v1.Task]: the proto that is serialized and stored in the database which
// contains a shard ID and a blob of the serialized history task. This is also called a "raw" task.
// - [go.temporal.io/server/service/history/tasks.Task]: the interface that is implemented by all history tasks.
// This is the primary type used in code to represent a history task since it is the most structured.
func (m *HistoryTaskQueueManager) ReadRawTasks(
ctx context.Context,
request *ReadRawTasksRequest,
) (*ReadRawTasksResponse, error) {
if request.PageSize <= 0 {
return nil, fmt.Errorf("%w: %v", ErrReadTasksNonPositivePageSize, request.PageSize)
}

response, err := m.queue.ReadMessages(ctx, &InternalReadMessagesRequest{
QueueType: request.QueueKey.QueueType,
QueueName: request.QueueKey.GetQueueName(),
PageSize: request.PageSize,
NextPageToken: request.NextPageToken,
})
if err != nil {
return nil, err
}

responseTasks := make([]persistencespb.HistoryTask, len(response.Messages))
for i, message := range response.Messages {
err := serialization.Proto3Decode(message.Data.Data, message.Data.EncodingType, &responseTasks[i])
if err != nil {
return nil, fmt.Errorf("%v: %w", ErrMsgDeserializeRawHistoryTask, err)
}
}

return &ReadRawTasksResponse{
Tasks: responseTasks,
NextPageToken: response.NextPageToken,
}, nil
}

// ReadTasks is a convenience method on top of ReadRawTasks that deserializes the tasks into the [tasks.Task] type.
func (m *HistoryTaskQueueManager) ReadTasks(ctx context.Context, request *ReadTasksRequest) (*ReadTasksResponse, error) {
response, err := m.ReadRawTasks(ctx, request)
if err != nil {
return nil, err
}

resTasks := make([]tasks.Task, len(response.Tasks))

for i, rawTask := range response.Tasks {
blob := rawTask.Blob
if blob == nil {
return nil, serialization.NewDeserializationError(enums.ENCODING_TYPE_PROTO3, ErrHistoryTaskBlobIsNil)
}

task, err := m.serializer.DeserializeTask(request.QueueKey.Category, *blob)
if err != nil {
return nil, fmt.Errorf("%v: %w", ErrMsgDeserializeHistoryTask, err)
}

resTasks[i] = task
}

return &ReadTasksResponse{
Tasks: resTasks,
NextPageToken: response.NextPageToken,
}, nil
}

// combineUnique combines the given strings into a single string by hashing the length of each string and the string
// itself. This is used to generate a unique suffix for the queue name.
func combineUnique(strs ...string) string {
h := sha256.New()
for _, str := range strs {
b := sha256.Sum256([]byte(str))
_, _ = h.Write(b[:])
}
return base64.StdEncoding.EncodeToString(h.Sum(nil))
}

func (k QueueKey) GetQueueName() string {
hash := combineUnique(k.SourceCluster, k.TargetCluster)[:clusterNamesHashSuffixLength]
return fmt.Sprintf("%d_%s_%s_%s", k.Category.ID(), k.SourceCluster, k.TargetCluster, hash)
}

0 comments on commit 172b65e

Please sign in to comment.