Skip to content

Commit

Permalink
Add a replication.DLQWriter interface
Browse files Browse the repository at this point in the history
  • Loading branch information
MichaelSnowden committed Oct 20, 2023
1 parent ef2deea commit 2d1905d
Show file tree
Hide file tree
Showing 6 changed files with 331 additions and 69 deletions.
12 changes: 11 additions & 1 deletion common/persistence/persistence_interface.go
Original file line number Diff line number Diff line change
Expand Up @@ -727,7 +727,17 @@ type (
RecordExpiry time.Time
}

// QueueV2 is an interface for a generic FIFO queue. It should eventually supersede the Queue interface.
// QueueV2 is an interface for a generic FIFO queue. It should eventually replace the Queue interface. Why do we
// need this migration? The main problem is very simple. The `queue_metadata` table in Cassandra has a primary key
// of (queue_type). This means that we can only have one queue of each type. This is a problem because we want to
// have multiple queues of the same type, but with different names. For example, we want to have a DLQ for
// replication tasks from one cluster to another, and cluster names are dynamic, so we can't create separate static
// queue types for each cluster. The solution is to add a queue_name column to the table, and make the primary key
// (queue_type, queue_name). This allows us to have multiple queues of the same type, but with different names.
// Since the new table (which is called `queues` in Cassandra), supports dynamic names, the interface built around
// it should also support dynamic names. This is why we need a new interface. There are other types built on top of
// this up the stack, like HistoryTaskQueueManager, for which the same principle of needing a new type because we
// now support dynamic names applies.
QueueV2 interface {
// EnqueueMessage adds a message to the back of the queue.
EnqueueMessage(
Expand Down
32 changes: 19 additions & 13 deletions common/persistence/serialization/task_serializer.go
Original file line number Diff line number Diff line change
Expand Up @@ -263,16 +263,9 @@ func (s *TaskSerializer) deserializeVisibilityTasks(
func (s *TaskSerializer) serializeReplicationTask(
task tasks.Task,
) (commonpb.DataBlob, error) {
var replicationTask *persistencespb.ReplicationTaskInfo
switch task := task.(type) {
case *tasks.SyncActivityTask:
replicationTask = s.replicationActivityTaskToProto(task)
case *tasks.HistoryReplicationTask:
replicationTask = s.replicationHistoryTaskToProto(task)
case *tasks.SyncWorkflowStateTask:
replicationTask = s.replicationSyncWorkflowStateTaskToProto(task)
default:
return commonpb.DataBlob{}, serviceerror.NewInternal(fmt.Sprintf("Unknown repication task type: %v", task))
replicationTask, ok := ReplicationTaskInfoFromTask(task)
if !ok {
return commonpb.DataBlob{}, serviceerror.NewInternal(fmt.Sprintf("Unknown replication task type: %v", task))
}

blob, err := ReplicationTaskInfoToBlob(replicationTask)
Expand All @@ -282,6 +275,19 @@ func (s *TaskSerializer) serializeReplicationTask(
return blob, nil
}

func ReplicationTaskInfoFromTask(task tasks.Task) (*persistencespb.ReplicationTaskInfo, bool) {
switch task := task.(type) {
case *tasks.SyncActivityTask:
return replicationActivityTaskToProto(task), true
case *tasks.HistoryReplicationTask:
return replicationHistoryTaskToProto(task), true
case *tasks.SyncWorkflowStateTask:
return replicationSyncWorkflowStateTaskToProto(task), true
default:
return nil, false
}
}

func (s *TaskSerializer) deserializeReplicationTasks(
blob commonpb.DataBlob,
) (tasks.Task, error) {
Expand Down Expand Up @@ -1014,7 +1020,7 @@ func (s *TaskSerializer) visibilityDeleteTaskFromProto(
}
}

func (s *TaskSerializer) replicationActivityTaskToProto(
func replicationActivityTaskToProto(
activityTask *tasks.SyncActivityTask,
) *persistencespb.ReplicationTaskInfo {
return &persistencespb.ReplicationTaskInfo{
Expand Down Expand Up @@ -1053,7 +1059,7 @@ func (s *TaskSerializer) replicationActivityTaskFromProto(
}
}

func (s *TaskSerializer) replicationHistoryTaskToProto(
func replicationHistoryTaskToProto(
historyTask *tasks.HistoryReplicationTask,
) *persistencespb.ReplicationTaskInfo {
return &persistencespb.ReplicationTaskInfo{
Expand Down Expand Up @@ -1130,7 +1136,7 @@ func (s *TaskSerializer) archiveExecutionTaskFromProto(
}
}

func (s *TaskSerializer) replicationSyncWorkflowStateTaskToProto(
func replicationSyncWorkflowStateTaskToProto(
syncWorkflowStateTask *tasks.SyncWorkflowStateTask,
) *persistencespb.ReplicationTaskInfo {
return &persistencespb.ReplicationTaskInfo{
Expand Down
127 changes: 127 additions & 0 deletions service/history/replication/dlq_writer.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,127 @@
// The MIT License
//
// Copyright (c) 2020 Temporal Technologies Inc. All rights reserved.
//
// Copyright (c) 2020 Uber Technologies, Inc.
//
// Permission is hereby granted, free of charge, to any person obtaining a copy
// of this software and associated documentation files (the "Software"), to deal
// in the Software without restriction, including without limitation the rights
// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
// copies of the Software, and to permit persons to whom the Software is
// furnished to do so, subject to the following conditions:
//
// The above copyright notice and this permission notice shall be included in
// all copies or substantial portions of the Software.
//
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
// THE SOFTWARE.

package replication

import (
"context"
"errors"
"fmt"

"go.temporal.io/server/common/namespace"
"go.temporal.io/server/common/persistence"
"go.temporal.io/server/common/persistence/serialization"
"go.temporal.io/server/service/history/tasks"
)

type (
// DLQWriter is an interface that can be implemented easily by the two different queue solutions that we have.
// - Queue V1 implements this interface via [persistence.ExecutionManager].
// - Queue V2 will implement this interface via [go.temporal.io/server/service/history/queues.DLQWriter].
//
// We want this interface to make the migration referenced by [persistence.QueueV2] easier.
DLQWriter interface {
WriteTaskToDLQ(ctx context.Context, request WriteRequest) error
}
// WriteRequest is a request to write a task to the DLQ.
WriteRequest struct {
SourceCluster string
NamespaceID string
WorkflowID string
Task tasks.Task
}
// ExecutionManager is an interface that is implemented by [persistence.ExecutionManager] that contains the
// only method that we need to implement the [DLQWriter] interface.
ExecutionManager interface {
PutReplicationTaskToDLQ(ctx context.Context, request *persistence.PutReplicationTaskToDLQRequest) error
}
// ShardContext is a [go.temporal.io/server/service/history/shard.Context] interface that contains the only method
// that we need to implement the [DLQWriter] interface.
ShardContext[T ExecutionManager] interface {
GetExecutionManager() T
}
// ShardController is a [go.temporal.io/server/service/history/shard.Controller] interface that contains the only
// method that we need to implement the [DLQWriter] interface.
ShardController[T ExecutionManager, U ShardContext[T]] interface {
GetShardByNamespaceWorkflow(namespaceID namespace.ID, workflowID string) (U, error)
}
// This type adapts an [ShardController] to the [DLQWriter] interface.
shardControllerDLQWriter[T ExecutionManager, U ShardContext[T], V ShardController[T, U]] struct {
shardController V
numHistoryShards int
}
)

var (
ErrInvalidNumHistoryShards = errors.New("invalid number of history shards")
ErrNonReplicationTask = errors.New("task to be written to DLQ is not a replication task")
)

// NewShardControllerDLQWriter creates a new DLQWriter that writes to the DLQ using an [ExecutionManager] provided by
// the given shard controller. There's a lot of generics going on here, so let's break it down:
// - [T] is the type of the [ExecutionManager] that the [ShardContext] provides.
// - [U] is the type of the [ShardContext] that the [ShardController] provides.
// - [V] is the type of the [ShardController] that we're adapting to the [DLQWriter] interface.
//
// Why do we need all of these generics? Because we want to decouple this from the shard package, and make it easier to
// write tests without leaving methods unimplemented.
func NewShardControllerDLQWriter[T ExecutionManager, U ShardContext[T], V ShardController[T, U]](
shardController V,
numHistoryShards int,
) (DLQWriter, error) {
if numHistoryShards <= 0 {
return nil, fmt.Errorf("%w: %v", ErrInvalidNumHistoryShards, numHistoryShards)
}
return &shardControllerDLQWriter[T, U, V]{
shardController: shardController,
numHistoryShards: numHistoryShards,
}, nil
}

// WriteTaskToDLQ implements [DLQWriter.WriteTaskToDLQ] by calling [ExecutionManager.PutReplicationTaskToDLQ].
// It returns an error if the task is not a replication task.
func (e *shardControllerDLQWriter[T, U, V]) WriteTaskToDLQ(
ctx context.Context,
request WriteRequest,
) error {
task := request.Task
shardID := tasks.GetShardIDForTask(task, e.numHistoryShards)
replicationTaskInfo, ok := serialization.ReplicationTaskInfoFromTask(task)
if !ok {
return fmt.Errorf("%w: %T, shardID: %v, taskID: %v", ErrNonReplicationTask, task, shardID, task.GetTaskID())
}
shardContext, err := e.shardController.GetShardByNamespaceWorkflow(
namespace.ID(request.NamespaceID),
request.WorkflowID,
)
if err != nil {
return err
}
executionManager := shardContext.GetExecutionManager()
return executionManager.PutReplicationTaskToDLQ(ctx, &persistence.PutReplicationTaskToDLQRequest{
SourceClusterName: request.SourceCluster,
ShardID: int32(shardID),
TaskInfo: replicationTaskInfo,
})
}
138 changes: 138 additions & 0 deletions service/history/replication/dlq_writer_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,138 @@
// The MIT License
//
// Copyright (c) 2020 Temporal Technologies Inc. All rights reserved.
//
// Copyright (c) 2020 Uber Technologies, Inc.
//
// Permission is hereby granted, free of charge, to any person obtaining a copy
// of this software and associated documentation files (the "Software"), to deal
// in the Software without restriction, including without limitation the rights
// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
// copies of the Software, and to permit persons to whom the Software is
// furnished to do so, subject to the following conditions:
//
// The above copyright notice and this permission notice shall be included in
// all copies or substantial portions of the Software.
//
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
// THE SOFTWARE.

package replication_test

import (
"context"
"testing"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"go.temporal.io/server/common/namespace"
"go.temporal.io/server/common/persistence"
"go.temporal.io/server/service/history/replication"
"go.temporal.io/server/service/history/tasks"
)

type (
fakeShardController struct {
requests []*persistence.PutReplicationTaskToDLQRequest
getShardErr error
}
)

func TestNewExecutionManagerDLQWriter_ErrInvalidNumHistoryShards(t *testing.T) {
t.Parallel()

_, err := newDLQWriter(newFakeShardController(), 0)
assert.ErrorIs(t, err, replication.ErrInvalidNumHistoryShards)
}

func TestNewExecutionManagerDLQWriter_NonReplicationTask(t *testing.T) {
t.Parallel()

shardController := newFakeShardController()
numHistoryShards := 1
writer, err := newDLQWriter(shardController, numHistoryShards)
require.NoError(t, err)
err = writer.WriteTaskToDLQ(context.Background(), replication.WriteRequest{
Task: &tasks.WorkflowTask{
TaskID: 42,
},
})
require.ErrorIs(t, err, replication.ErrNonReplicationTask)
assert.ErrorContains(t, err, "42")
}

func TestNewExecutionManagerDLQWriter_ReplicationTask(t *testing.T) {
t.Parallel()

shardController := newFakeShardController()
writer, err := replication.NewShardControllerDLQWriter[*fakeShardController, *fakeShardController](
shardController,
1,
)
require.NoError(t, err)
err = writer.WriteTaskToDLQ(context.Background(), replication.WriteRequest{
Task: &tasks.HistoryReplicationTask{
TaskID: 42,
},
})
require.NoError(t, err)
if assert.Len(t, shardController.requests, 1) {
assert.Equal(t, int32(1), shardController.requests[0].ShardID)
assert.Equal(t, int64(42), shardController.requests[0].TaskInfo.GetTaskId())
}
}

func TestNewExecutionManagerDLQWriter_GetShardErr(t *testing.T) {
t.Parallel()

shardController := newFakeShardController()
shardController.getShardErr = assert.AnError
writer, err := replication.NewShardControllerDLQWriter[*fakeShardController, *fakeShardController](
shardController,
1,
)
require.NoError(t, err)
err = writer.WriteTaskToDLQ(context.Background(), replication.WriteRequest{
Task: &tasks.HistoryReplicationTask{
TaskID: 42,
},
})
require.ErrorIs(t, err, assert.AnError)
assert.Empty(t, shardController.requests)
}

func newDLQWriter(shardController *fakeShardController, numHistoryShards int) (replication.DLQWriter, error) {
writer, err := replication.NewShardControllerDLQWriter[*fakeShardController, *fakeShardController](
shardController,
numHistoryShards,
)
return writer, err
}

func newFakeShardController() *fakeShardController {
return &fakeShardController{}
}

func (f *fakeShardController) GetExecutionManager() *fakeShardController {
return f
}

func (f *fakeShardController) GetShardByNamespaceWorkflow(namespace.ID, string) (*fakeShardController, error) {
if f.getShardErr != nil {
return nil, f.getShardErr
}
return f, nil
}

func (f *fakeShardController) PutReplicationTaskToDLQ(
_ context.Context,
request *persistence.PutReplicationTaskToDLQRequest,
) error {
f.requests = append(f.requests, request)
return nil
}

0 comments on commit 2d1905d

Please sign in to comment.