Skip to content

Commit

Permalink
Add a replication.DLQWriter interface
Browse files Browse the repository at this point in the history
  • Loading branch information
MichaelSnowden committed Oct 20, 2023
1 parent 5809a24 commit 10c3ca5
Show file tree
Hide file tree
Showing 22 changed files with 267 additions and 96 deletions.
12 changes: 11 additions & 1 deletion common/persistence/persistence_interface.go
Original file line number Diff line number Diff line change
Expand Up @@ -727,7 +727,17 @@ type (
RecordExpiry time.Time
}

// QueueV2 is an interface for a generic FIFO queue. It should eventually supersede the Queue interface.
// QueueV2 is an interface for a generic FIFO queue. It should eventually replace the Queue interface. Why do we
// need this migration? The main problem is very simple. The `queue_metadata` table in Cassandra has a primary key
// of (queue_type). This means that we can only have one queue of each type. This is a problem because we want to
// have multiple queues of the same type, but with different names. For example, we want to have a DLQ for
// replication tasks from one cluster to another, and cluster names are dynamic, so we can't create separate static
// queue types for each cluster. The solution is to add a queue_name column to the table, and make the primary key
// (queue_type, queue_name). This allows us to have multiple queues of the same type, but with different names.
// Since the new table (which is called `queues` in Cassandra), supports dynamic names, the interface built around
// it should also support dynamic names. This is why we need a new interface. There are other types built on top of
// this up the stack, like HistoryTaskQueueManager, for which the same principle of needing a new type because we
// now support dynamic names applies.
QueueV2 interface {
// EnqueueMessage adds a message to the back of the queue.
EnqueueMessage(
Expand Down
2 changes: 2 additions & 0 deletions service/history/history_engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -164,6 +164,7 @@ func NewEngineWithShardContext(
persistenceVisibilityMgr manager.VisibilityManager,
eventBlobCache persistence.XDCCache,
taskCategoryRegistry tasks.TaskCategoryRegistry,
dlqWriter replication.DLQWriter,
) shard.Engine {
currentClusterName := shard.GetClusterMetadata().GetCurrentClusterName()

Expand Down Expand Up @@ -283,6 +284,7 @@ func NewEngineWithShardContext(
eventSerializer,
replicationTaskFetcherFactory,
replicationTaskExecutorProvider,
dlqWriter,
)
return historyEngImpl
}
Expand Down
2 changes: 2 additions & 0 deletions service/history/history_engine_factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,7 @@ type (
PersistenceVisibilityMgr manager.VisibilityManager
EventBlobCache persistence.XDCCache
TaskCategoryRegistry tasks.TaskCategoryRegistry
ReplicationDLQWriter replication.DLQWriter
}

historyEngineFactory struct {
Expand Down Expand Up @@ -99,5 +100,6 @@ func (f *historyEngineFactory) CreateEngine(
f.PersistenceVisibilityMgr,
f.EventBlobCache,
f.TaskCategoryRegistry,
f.ReplicationDLQWriter,
)
}
73 changes: 73 additions & 0 deletions service/history/replication/dlq_writer.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,73 @@
// The MIT License
//
// Copyright (c) 2020 Temporal Technologies Inc. All rights reserved.
//
// Copyright (c) 2020 Uber Technologies, Inc.
//
// Permission is hereby granted, free of charge, to any person obtaining a copy
// of this software and associated documentation files (the "Software"), to deal
// in the Software without restriction, including without limitation the rights
// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
// copies of the Software, and to permit persons to whom the Software is
// furnished to do so, subject to the following conditions:
//
// The above copyright notice and this permission notice shall be included in
// all copies or substantial portions of the Software.
//
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
// THE SOFTWARE.

package replication

import (
"context"

persistencespb "go.temporal.io/server/api/persistence/v1"
"go.temporal.io/server/common/persistence"
"go.temporal.io/server/service/history/shard"
)

type (
// DLQWriter is an interface that can be implemented easily by the two different queue solutions that we have.
// - Queue V1 implements this interface via [shard.Context] and [persistence.ExecutionManager].
// - Queue V2 will implement this interface via [go.temporal.io/server/service/history/queues.DLQWriter].
//
// We want this interface to make the migration referenced by [persistence.QueueV2] easier.
DLQWriter interface {
WriteTaskToDLQ(ctx context.Context, request WriteRequest) error
}
// WriteRequest is a request to write a task to the DLQ.
WriteRequest struct {
// ShardContext is an argument that we can remove once we migrate to queue V2.
ShardContext shard.Context
SourceCluster string
ReplicationTaskInfo *persistencespb.ReplicationTaskInfo
}
// ShardContextDLQWriter is a [DLQWriter] that uses a [shard.Context].
// The zero-value is a valid instance.
ShardContextDLQWriter struct{}
)

// NewShardContextDLQWriter creates a new DLQWriter.
func NewShardContextDLQWriter() DLQWriter {
return &ShardContextDLQWriter{}
}

// WriteTaskToDLQ implements [DLQWriter.WriteTaskToDLQ] by calling [persistence.ExecutionManager.PutReplicationTaskToDLQ].
func (e *ShardContextDLQWriter) WriteTaskToDLQ(
ctx context.Context,
request WriteRequest,
) error {
return request.ShardContext.GetExecutionManager().PutReplicationTaskToDLQ(
ctx, &persistence.PutReplicationTaskToDLQRequest{
SourceClusterName: request.SourceCluster,
ShardID: request.ShardContext.GetShardID(),
TaskInfo: request.ReplicationTaskInfo,
},
)
}
88 changes: 88 additions & 0 deletions service/history/replication/dlq_writer_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,88 @@
// The MIT License
//
// Copyright (c) 2020 Temporal Technologies Inc. All rights reserved.
//
// Copyright (c) 2020 Uber Technologies, Inc.
//
// Permission is hereby granted, free of charge, to any person obtaining a copy
// of this software and associated documentation files (the "Software"), to deal
// in the Software without restriction, including without limitation the rights
// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
// copies of the Software, and to permit persons to whom the Software is
// furnished to do so, subject to the following conditions:
//
// The above copyright notice and this permission notice shall be included in
// all copies or substantial portions of the Software.
//
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
// THE SOFTWARE.

package replication_test

import (
"context"
"testing"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
persistencespb "go.temporal.io/server/api/persistence/v1"
"go.temporal.io/server/common/persistence"
"go.temporal.io/server/service/history/replication"
"go.temporal.io/server/service/history/shard"
)

type (
fakeShardContext struct {
shard.Context
requests []*persistence.PutReplicationTaskToDLQRequest
shardID int
}
fakeExecutionManager struct {
persistence.ExecutionManager
requests *[]*persistence.PutReplicationTaskToDLQRequest
}
)

func TestNewExecutionManagerDLQWriter_ReplicationTask(t *testing.T) {
t.Parallel()

writer := replication.NewShardContextDLQWriter()
shardContext := &fakeShardContext{
shardID: 13,
}
replicationTaskInfo := &persistencespb.ReplicationTaskInfo{
TaskId: 21,
}
err := writer.WriteTaskToDLQ(context.Background(), replication.WriteRequest{
ShardContext: shardContext,
SourceCluster: "test-source-cluster",
ReplicationTaskInfo: replicationTaskInfo,
})
require.NoError(t, err)
if assert.Len(t, shardContext.requests, 1) {
assert.Equal(t, 13, int(shardContext.requests[0].ShardID))
assert.Equal(t, "test-source-cluster", shardContext.requests[0].SourceClusterName)
assert.Equal(t, replicationTaskInfo, shardContext.requests[0].TaskInfo)
}
}

func (f *fakeShardContext) GetShardID() int32 {
return int32(f.shardID)
}

func (f *fakeShardContext) GetExecutionManager() persistence.ExecutionManager {
return &fakeExecutionManager{requests: &f.requests}
}

func (f *fakeExecutionManager) PutReplicationTaskToDLQ(
_ context.Context,
request *persistence.PutReplicationTaskToDLQRequest,
) error {
*f.requests = append(*f.requests, request)
return nil
}
27 changes: 13 additions & 14 deletions service/history/replication/executable_activity_state_task.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,6 @@ import (
"go.temporal.io/server/common/log/tag"
"go.temporal.io/server/common/metrics"
"go.temporal.io/server/common/namespace"
"go.temporal.io/server/common/persistence"
serviceerrors "go.temporal.io/server/common/serviceerror"
ctasks "go.temporal.io/server/common/tasks"
)
Expand Down Expand Up @@ -182,18 +181,14 @@ func (e *ExecutableActivityStateTask) MarkPoisonPill() error {
}

// TODO: GetShardID will break GetDLQReplicationMessages we need to handle DLQ for cross shard replication.
req := &persistence.PutReplicationTaskToDLQRequest{
ShardID: shardContext.GetShardID(),
SourceClusterName: e.ExecutableTask.SourceClusterName(),
TaskInfo: &persistencespb.ReplicationTaskInfo{
NamespaceId: e.NamespaceID,
WorkflowId: e.WorkflowID,
RunId: e.RunID,
TaskId: e.ExecutableTask.TaskID(),
TaskType: enumsspb.TASK_TYPE_REPLICATION_SYNC_ACTIVITY,
ScheduledEventId: e.req.ScheduledEventId,
Version: e.req.Version,
},
replicationTaskInfo := &persistencespb.ReplicationTaskInfo{
NamespaceId: e.NamespaceID,
WorkflowId: e.WorkflowID,
RunId: e.RunID,
TaskId: e.ExecutableTask.TaskID(),
TaskType: enumsspb.TASK_TYPE_REPLICATION_SYNC_ACTIVITY,
ScheduledEventId: e.req.ScheduledEventId,
Version: e.req.Version,
}

e.Logger.Error("enqueue activity state replication task to DLQ",
Expand All @@ -207,5 +202,9 @@ func (e *ExecutableActivityStateTask) MarkPoisonPill() error {
ctx, cancel := newTaskContext(e.NamespaceID)
defer cancel()

return shardContext.GetExecutionManager().PutReplicationTaskToDLQ(ctx, req)
return e.DLQWriter.WriteTaskToDLQ(ctx, WriteRequest{
ShardContext: shardContext,
SourceCluster: e.SourceClusterName(),
ReplicationTaskInfo: replicationTaskInfo,
})
}
Original file line number Diff line number Diff line change
Expand Up @@ -132,6 +132,7 @@ func (s *executableActivityStateTaskSuite) SetupTest() {
NDCHistoryResender: s.ndcHistoryResender,
MetricsHandler: s.metricsHandler,
Logger: s.logger,
DLQWriter: NewShardContextDLQWriter(),
},
s.taskID,
time.Unix(0, rand.Int63()),
Expand Down
29 changes: 14 additions & 15 deletions service/history/replication/executable_history_task.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,6 @@ import (
"go.temporal.io/server/common/log/tag"
"go.temporal.io/server/common/metrics"
"go.temporal.io/server/common/namespace"
"go.temporal.io/server/common/persistence"
"go.temporal.io/server/common/persistence/versionhistory"
serviceerrors "go.temporal.io/server/common/serviceerror"
ctasks "go.temporal.io/server/common/tasks"
Expand Down Expand Up @@ -225,19 +224,15 @@ func (e *ExecutableHistoryTask) MarkPoisonPill() error {
}

// TODO: GetShardID will break GetDLQReplicationMessages we need to handle DLQ for cross shard replication.
req := &persistence.PutReplicationTaskToDLQRequest{
ShardID: shardContext.GetShardID(),
SourceClusterName: e.ExecutableTask.SourceClusterName(),
TaskInfo: &persistencespb.ReplicationTaskInfo{
NamespaceId: e.NamespaceID,
WorkflowId: e.WorkflowID,
RunId: e.RunID,
TaskId: e.ExecutableTask.TaskID(),
TaskType: enumsspb.TASK_TYPE_REPLICATION_HISTORY,
FirstEventId: events[0].GetEventId(),
NextEventId: events[len(events)-1].GetEventId() + 1,
Version: events[0].GetVersion(),
},
taskInfo := &persistencespb.ReplicationTaskInfo{
NamespaceId: e.NamespaceID,
WorkflowId: e.WorkflowID,
RunId: e.RunID,
TaskId: e.ExecutableTask.TaskID(),
TaskType: enumsspb.TASK_TYPE_REPLICATION_HISTORY,
FirstEventId: events[0].GetEventId(),
NextEventId: events[len(events)-1].GetEventId() + 1,
Version: events[0].GetVersion(),
}

e.Logger.Error("enqueue history replication task to DLQ",
Expand All @@ -251,7 +246,11 @@ func (e *ExecutableHistoryTask) MarkPoisonPill() error {
ctx, cancel := newTaskContext(e.NamespaceID)
defer cancel()

return shardContext.GetExecutionManager().PutReplicationTaskToDLQ(ctx, req)
return e.DLQWriter.WriteTaskToDLQ(ctx, WriteRequest{
ShardContext: shardContext,
SourceCluster: e.SourceClusterName(),
ReplicationTaskInfo: taskInfo,
})
}

func (e *ExecutableHistoryTask) getDeserializedEvents() (_ []*historypb.HistoryEvent, _ []*historypb.HistoryEvent, retError error) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -151,6 +151,7 @@ func (s *executableHistoryTaskSuite) SetupTest() {
Logger: s.logger,
EagerNamespaceRefresher: s.eagerNamespaceRefresher,
EventSerializer: s.eventSerializer,
DLQWriter: NewShardContextDLQWriter(),
}
s.task = NewExecutableHistoryTask(
s.processToolBox,
Expand Down
1 change: 1 addition & 0 deletions service/history/replication/executable_noop_task_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,7 @@ func (s *executableNoopTaskSuite) SetupTest() {
MetricsHandler: s.metricsHandler,
Logger: s.logger,
EagerNamespaceRefresher: s.eagerNamespaceRefresher,
DLQWriter: NewShardContextDLQWriter(),
},
rand.Int63(),
time.Unix(0, rand.Int63()),
Expand Down
1 change: 1 addition & 0 deletions service/history/replication/executable_task_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -117,6 +117,7 @@ func (s *executableTaskSuite) SetupTest() {
MetricsHandler: s.metricsHandler,
Logger: s.logger,
EagerNamespaceRefresher: s.eagerNamespaceRefresher,
DLQWriter: NewShardContextDLQWriter(),
},
rand.Int63(),
"metrics-tag",
Expand Down
1 change: 1 addition & 0 deletions service/history/replication/executable_task_tool_box.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,5 +54,6 @@ type (
MetricsHandler metrics.Handler
Logger log.Logger
EventSerializer serialization.Serializer
DLQWriter DLQWriter
}
)
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,7 @@ func (s *executableUnknownTaskSuite) SetupTest() {
MetricsHandler: s.metricsHandler,
Logger: s.logger,
EagerNamespaceRefresher: s.eagerNamespaceRefresher,
DLQWriter: NewShardContextDLQWriter(),
},
s.taskID,
time.Unix(0, rand.Int63()),
Expand Down
23 changes: 11 additions & 12 deletions service/history/replication/executable_workflow_state_task.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,6 @@ import (
"go.temporal.io/server/common/log/tag"
"go.temporal.io/server/common/metrics"
"go.temporal.io/server/common/namespace"
"go.temporal.io/server/common/persistence"
serviceerrors "go.temporal.io/server/common/serviceerror"
ctasks "go.temporal.io/server/common/tasks"
)
Expand Down Expand Up @@ -175,16 +174,12 @@ func (e *ExecutableWorkflowStateTask) MarkPoisonPill() error {
}

// TODO: GetShardID will break GetDLQReplicationMessages we need to handle DLQ for cross shard replication.
req := &persistence.PutReplicationTaskToDLQRequest{
ShardID: shardContext.GetShardID(),
SourceClusterName: e.ExecutableTask.SourceClusterName(),
TaskInfo: &persistencespb.ReplicationTaskInfo{
NamespaceId: e.NamespaceID,
WorkflowId: e.WorkflowID,
RunId: e.RunID,
TaskId: e.ExecutableTask.TaskID(),
TaskType: enumsspb.TASK_TYPE_REPLICATION_SYNC_WORKFLOW_STATE,
},
taskInfo := &persistencespb.ReplicationTaskInfo{
NamespaceId: e.NamespaceID,
WorkflowId: e.WorkflowID,
RunId: e.RunID,
TaskId: e.ExecutableTask.TaskID(),
TaskType: enumsspb.TASK_TYPE_REPLICATION_SYNC_WORKFLOW_STATE,
}

e.Logger.Error("enqueue workflow state replication task to DLQ",
Expand All @@ -198,5 +193,9 @@ func (e *ExecutableWorkflowStateTask) MarkPoisonPill() error {
ctx, cancel := newTaskContext(e.NamespaceID)
defer cancel()

return shardContext.GetExecutionManager().PutReplicationTaskToDLQ(ctx, req)
return e.DLQWriter.WriteTaskToDLQ(ctx, WriteRequest{
ShardContext: shardContext,
SourceCluster: e.ExecutableTask.SourceClusterName(),
ReplicationTaskInfo: taskInfo,
})
}

0 comments on commit 10c3ca5

Please sign in to comment.