Skip to content

Commit

Permalink
Split replication executable tool box and task converter (#4838)
Browse files Browse the repository at this point in the history
<!-- Describe what has changed in this PR -->
**What changed?**
Split replication executable tool box and task converter

<!-- Tell your future self why have you made these changes -->
**Why?**
Isolate fx in struct and task converter logic.


<!-- How have you verified this change? Tested locally? Added a unit
test? Checked in staging env? -->
**How did you test it?**
Code refactoring. Rely on existing unit tests.

<!-- Assuming the worst case, what can be broken when deploying this
change to production? -->
**Potential risks**


<!-- Is this PR a hotfix candidate or require that a notification be
sent to the broader community? (Yes/No) -->
**Is hotfix candidate?**
  • Loading branch information
yux0 committed Sep 12, 2023
1 parent b670c79 commit 6a5a86b
Show file tree
Hide file tree
Showing 7 changed files with 130 additions and 57 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -28,58 +28,54 @@ import (
"fmt"
"time"

"go.uber.org/fx"

enumsspb "go.temporal.io/server/api/enums/v1"
replicationspb "go.temporal.io/server/api/replication/v1"
"go.temporal.io/server/client"
"go.temporal.io/server/common/cluster"
"go.temporal.io/server/common/log"
"go.temporal.io/server/common/metrics"
"go.temporal.io/server/common/namespace"
ctasks "go.temporal.io/server/common/tasks"
"go.temporal.io/server/common/xdc"
"go.temporal.io/server/service/history/configs"
"go.temporal.io/server/service/history/shard"
)

type (
ProcessToolBox struct {
fx.In
ExecutableTaskConverter interface {
Convert(
taskClusterName string,
clientShardKey ClusterShardKey,
serverShardKey ClusterShardKey,
replicationTasks ...*replicationspb.ReplicationTask,
) []TrackableExecutableTask
}

Config *configs.Config
ClusterMetadata cluster.Metadata
ClientBean client.Bean
ShardController shard.Controller
NamespaceCache namespace.Registry
EagerNamespaceRefresher EagerNamespaceRefresher
NDCHistoryResender xdc.NDCHistoryResender
TaskScheduler ctasks.Scheduler[TrackableExecutableTask]
MetricsHandler metrics.Handler
Logger log.Logger
executableTaskConverterImpl struct {
processToolBox ProcessToolBox
}
)

func (i *ProcessToolBox) ConvertTasks(
func NewExecutableTaskConverter(
processToolBox ProcessToolBox,
) *executableTaskConverterImpl {
return &executableTaskConverterImpl{
processToolBox: processToolBox,
}
}

func (e *executableTaskConverterImpl) Convert(
taskClusterName string,
clientShardKey ClusterShardKey,
serverShardKey ClusterShardKey,
replicationTasks ...*replicationspb.ReplicationTask,
) []TrackableExecutableTask {
tasks := make([]TrackableExecutableTask, len(replicationTasks))
for index, replicationTask := range replicationTasks {
i.MetricsHandler.Counter(metrics.ReplicationTasksRecv.GetMetricName()).Record(
e.processToolBox.MetricsHandler.Counter(metrics.ReplicationTasksRecv.GetMetricName()).Record(
int64(1),
metrics.FromClusterIDTag(serverShardKey.ClusterID),
metrics.ToClusterIDTag(clientShardKey.ClusterID),
metrics.OperationTag(TaskOperationTag(replicationTask)),
)
tasks[index] = i.convertOne(taskClusterName, replicationTask)
tasks[index] = e.convertOne(taskClusterName, replicationTask)
}
return tasks
}

func (i *ProcessToolBox) convertOne(
func (e *executableTaskConverterImpl) convertOne(
taskClusterName string,
replicationTask *replicationspb.ReplicationTask,
) TrackableExecutableTask {
Expand All @@ -93,46 +89,46 @@ func (i *ProcessToolBox) convertOne(
switch replicationTask.GetTaskType() {
case enumsspb.REPLICATION_TASK_TYPE_SYNC_SHARD_STATUS_TASK: // TODO to be deprecated
return NewExecutableNoopTask(
*i,
e.processToolBox,
replicationTask.SourceTaskId,
taskCreationTime,
taskClusterName,
)
case enumsspb.REPLICATION_TASK_TYPE_HISTORY_METADATA_TASK: // TODO to be deprecated
return NewExecutableNoopTask(
*i,
e.processToolBox,
replicationTask.SourceTaskId,
taskCreationTime,
taskClusterName,
)
case enumsspb.REPLICATION_TASK_TYPE_SYNC_ACTIVITY_TASK:
return NewExecutableActivityStateTask(
*i,
e.processToolBox,
replicationTask.SourceTaskId,
taskCreationTime,
replicationTask.GetSyncActivityTaskAttributes(),
taskClusterName,
)
case enumsspb.REPLICATION_TASK_TYPE_SYNC_WORKFLOW_STATE_TASK:
return NewExecutableWorkflowStateTask(
*i,
e.processToolBox,
replicationTask.SourceTaskId,
taskCreationTime,
replicationTask.GetSyncWorkflowStateTaskAttributes(),
taskClusterName,
)
case enumsspb.REPLICATION_TASK_TYPE_HISTORY_V2_TASK:
return NewExecutableHistoryTask(
*i,
e.processToolBox,
replicationTask.SourceTaskId,
taskCreationTime,
replicationTask.GetHistoryTaskAttributes(),
taskClusterName,
)
default:
i.Logger.Error(fmt.Sprintf("unknown replication task: %v", replicationTask))
e.processToolBox.Logger.Error(fmt.Sprintf("unknown replication task: %v", replicationTask))
return NewExecutableUnknownTask(
*i,
e.processToolBox,
replicationTask.SourceTaskId,
taskCreationTime,
replicationTask,
Expand Down
56 changes: 56 additions & 0 deletions service/history/replication/executable_task_tool_box.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
// The MIT License
//
// Copyright (c) 2020 Temporal Technologies Inc. All rights reserved.
//
// Copyright (c) 2020 Uber Technologies, Inc.
//
// Permission is hereby granted, free of charge, to any person obtaining a copy
// of this software and associated documentation files (the "Software"), to deal
// in the Software without restriction, including without limitation the rights
// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
// copies of the Software, and to permit persons to whom the Software is
// furnished to do so, subject to the following conditions:
//
// The above copyright notice and this permission notice shall be included in
// all copies or substantial portions of the Software.
//
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
// THE SOFTWARE.

package replication

import (
"go.uber.org/fx"

"go.temporal.io/server/client"
"go.temporal.io/server/common/cluster"
"go.temporal.io/server/common/log"
"go.temporal.io/server/common/metrics"
"go.temporal.io/server/common/namespace"
ctasks "go.temporal.io/server/common/tasks"
"go.temporal.io/server/common/xdc"
"go.temporal.io/server/service/history/configs"
"go.temporal.io/server/service/history/shard"
)

type (
ProcessToolBox struct {
fx.In

Config *configs.Config
ClusterMetadata cluster.Metadata
ClientBean client.Bean
ShardController shard.Controller
NamespaceCache namespace.Registry
EagerNamespaceRefresher EagerNamespaceRefresher
NDCHistoryResender xdc.NDCHistoryResender
TaskScheduler ctasks.Scheduler[TrackableExecutableTask]
MetricsHandler metrics.Handler
Logger log.Logger
}
)
12 changes: 11 additions & 1 deletion service/history/replication/fx.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,9 +27,10 @@ package replication
import (
"context"

"go.uber.org/fx"

"go.temporal.io/server/common/metrics"
"go.temporal.io/server/common/persistence"
"go.uber.org/fx"

"go.temporal.io/server/api/historyservice/v1"
"go.temporal.io/server/client"
Expand All @@ -48,6 +49,7 @@ var Module = fx.Options(
fx.Provide(ReplicationTaskConverterFactoryProvider),
fx.Provide(ReplicationTaskExecutorProvider),
fx.Provide(ReplicationStreamSchedulerProvider),
fx.Provide(ExecutableTaskConverterProvider),
fx.Provide(StreamReceiverMonitorProvider),
fx.Invoke(ReplicationStreamSchedulerLifetimeHooks),
fx.Provide(NDCHistoryResenderProvider),
Expand Down Expand Up @@ -147,11 +149,19 @@ func ReplicationStreamSchedulerLifetimeHooks(
)
}

func ExecutableTaskConverterProvider(
processToolBox ProcessToolBox,
) ExecutableTaskConverter {
return NewExecutableTaskConverter(processToolBox)
}

func StreamReceiverMonitorProvider(
processToolBox ProcessToolBox,
taskConverter ExecutableTaskConverter,
) StreamReceiverMonitor {
return NewStreamReceiverMonitor(
processToolBox,
taskConverter,
processToolBox.Config.EnableReplicationStream(),
)
}
Expand Down
5 changes: 4 additions & 1 deletion service/history/replication/stream_receiver.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@ type (
shutdownChan channel.ShutdownOnce
logger log.Logger
stream Stream
taskConverter ExecutableTaskConverter
}
)

Expand All @@ -73,6 +74,7 @@ func NewClusterShardKey(

func NewStreamReceiver(
processToolBox ProcessToolBox,
taskConverter ExecutableTaskConverter,
clientShardKey ClusterShardKey,
serverShardKey ClusterShardKey,
) *StreamReceiverImpl {
Expand All @@ -92,6 +94,7 @@ func NewStreamReceiver(
clientShardKey,
serverShardKey,
),
taskConverter: taskConverter,
}
}

Expand Down Expand Up @@ -217,7 +220,7 @@ func (r *StreamReceiverImpl) processMessages(
r.logger.Error("StreamReceiver recv stream encountered unexpected err", tag.Error(streamResp.Err))
return streamResp.Err
}
tasks := r.ConvertTasks(
tasks := r.taskConverter.Convert(
clusterName,
r.clientShardKey,
r.serverShardKey,
Expand Down
10 changes: 7 additions & 3 deletions service/history/replication/stream_receiver_monitor.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,8 @@ type (
}
StreamReceiverMonitorImpl struct {
ProcessToolBox
enableStreaming bool
executableTaskConverter ExecutableTaskConverter
enableStreaming bool

status int32
shutdownOnce channel.ShutdownOnce
Expand All @@ -60,11 +61,13 @@ type (

func NewStreamReceiverMonitor(
processToolBox ProcessToolBox,
executableTaskConverter ExecutableTaskConverter,
enableStreaming bool,
) *StreamReceiverMonitorImpl {
return &StreamReceiverMonitorImpl{
ProcessToolBox: processToolBox,
enableStreaming: enableStreaming,
ProcessToolBox: processToolBox,
executableTaskConverter: executableTaskConverter,
enableStreaming: enableStreaming,

status: streamStatusInitialized,
shutdownOnce: channel.NewShutdownOnce(),
Expand Down Expand Up @@ -284,6 +287,7 @@ func (m *StreamReceiverMonitorImpl) doReconcileOutboundStreams(
if _, ok := m.outboundStreams[streamKey]; !ok {
stream := NewStreamReceiver(
m.ProcessToolBox,
m.executableTaskConverter,
streamKey.Client,
streamKey.Server,
)
Expand Down
28 changes: 15 additions & 13 deletions service/history/replication/stream_receiver_monitor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -81,20 +81,22 @@ func (s *streamReceiverMonitorSuite) SetupTest() {
s.clientBean = client.NewMockBean(s.controller)
s.shardController = shard.NewMockController(s.controller)

processToolBox := ProcessToolBox{
Config: configs.NewConfig(
dynamicconfig.NewNoopCollection(),
1,
true,
false,
),
ClusterMetadata: s.clusterMetadata,
ClientBean: s.clientBean,
ShardController: s.shardController,
MetricsHandler: metrics.NoopMetricsHandler,
Logger: log.NewNoopLogger(),
}
s.streamReceiverMonitor = NewStreamReceiverMonitor(
ProcessToolBox{
Config: configs.NewConfig(
dynamicconfig.NewNoopCollection(),
1,
true,
false,
),
ClusterMetadata: s.clusterMetadata,
ClientBean: s.clientBean,
ShardController: s.shardController,
MetricsHandler: metrics.NoopMetricsHandler,
Logger: log.NewNoopLogger(),
},
processToolBox,
NewExecutableTaskConverter(processToolBox),
true,
)
streamClient := adminservicemock.NewMockAdminService_StreamWorkflowReplicationMessagesClient(s.controller)
Expand Down
14 changes: 8 additions & 6 deletions service/history/replication/stream_receiver_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -95,13 +95,15 @@ func (s *streamReceiverSuite) SetupTest() {
tasks: nil,
}

processToolBox := ProcessToolBox{
ClusterMetadata: s.clusterMetadata,
TaskScheduler: s.taskScheduler,
MetricsHandler: metrics.NoopMetricsHandler,
Logger: log.NewTestLogger(),
}
s.streamReceiver = NewStreamReceiver(
ProcessToolBox{
ClusterMetadata: s.clusterMetadata,
TaskScheduler: s.taskScheduler,
MetricsHandler: metrics.NoopMetricsHandler,
Logger: log.NewTestLogger(),
},
processToolBox,
NewExecutableTaskConverter(processToolBox),
NewClusterShardKey(rand.Int31(), rand.Int31()),
NewClusterShardKey(rand.Int31(), rand.Int31()),
)
Expand Down

0 comments on commit 6a5a86b

Please sign in to comment.