Skip to content

Commit

Permalink
Refactor ndc history resender to handle multiple remote clusters (#2866)
Browse files Browse the repository at this point in the history
  • Loading branch information
yycptt committed May 19, 2022
1 parent b618a94 commit d36291f
Show file tree
Hide file tree
Showing 20 changed files with 178 additions and 150 deletions.
24 changes: 19 additions & 5 deletions common/xdc/nDCHistoryResender.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ import (
"context"
"time"

"go.temporal.io/server/client"
"go.temporal.io/server/common/persistence/serialization"

commonpb "go.temporal.io/api/common/v1"
Expand Down Expand Up @@ -58,6 +59,7 @@ type (
NDCHistoryResender interface {
// SendSingleWorkflowHistory sends multiple run IDs's history events to remote
SendSingleWorkflowHistory(
remoteClusterName string,
namespaceID namespace.ID,
workflowID string,
runID string,
Expand All @@ -71,7 +73,7 @@ type (
// NDCHistoryResenderImpl is the implementation of NDCHistoryResender
NDCHistoryResenderImpl struct {
namespaceRegistry namespace.Registry
adminClient adminservice.AdminServiceClient
clientBean client.Bean
historyReplicationFn nDCHistoryReplicationFn
serializer serialization.Serializer
rereplicationTimeout dynamicconfig.DurationPropertyFnWithNamespaceIDFilter
Expand All @@ -91,7 +93,7 @@ const (
// NewNDCHistoryResender create a new NDCHistoryResenderImpl
func NewNDCHistoryResender(
namespaceRegistry namespace.Registry,
adminClient adminservice.AdminServiceClient,
clientBean client.Bean,
historyReplicationFn nDCHistoryReplicationFn,
serializer serialization.Serializer,
rereplicationTimeout dynamicconfig.DurationPropertyFnWithNamespaceIDFilter,
Expand All @@ -100,7 +102,7 @@ func NewNDCHistoryResender(

return &NDCHistoryResenderImpl{
namespaceRegistry: namespaceRegistry,
adminClient: adminClient,
clientBean: clientBean,
historyReplicationFn: historyReplicationFn,
serializer: serializer,
rereplicationTimeout: rereplicationTimeout,
Expand All @@ -110,6 +112,7 @@ func NewNDCHistoryResender(

// SendSingleWorkflowHistory sends one run IDs's history events to remote
func (n *NDCHistoryResenderImpl) SendSingleWorkflowHistory(
remoteClusterName string,
namespaceID namespace.ID,
workflowID string,
runID string,
Expand All @@ -131,13 +134,15 @@ func (n *NDCHistoryResenderImpl) SendSingleWorkflowHistory(

historyIterator := collection.NewPagingIterator(n.getPaginationFn(
ctx,
remoteClusterName,
namespaceID,
workflowID,
runID,
startEventID,
startEventVersion,
endEventID,
endEventVersion))
endEventVersion,
))

for historyIterator.HasNext() {
batch, err := historyIterator.Next()
Expand Down Expand Up @@ -172,6 +177,7 @@ func (n *NDCHistoryResenderImpl) SendSingleWorkflowHistory(

func (n *NDCHistoryResenderImpl) getPaginationFn(
ctx context.Context,
remoteClusterName string,
namespaceID namespace.ID,
workflowID string,
runID string,
Expand All @@ -185,6 +191,7 @@ func (n *NDCHistoryResenderImpl) getPaginationFn(

response, err := n.getHistory(
ctx,
remoteClusterName,
namespaceID,
workflowID,
runID,
Expand Down Expand Up @@ -244,6 +251,7 @@ func (n *NDCHistoryResenderImpl) sendReplicationRawRequest(

func (n *NDCHistoryResenderImpl) getHistory(
ctx context.Context,
remoteClusterName string,
namespaceID namespace.ID,
workflowID string,
runID string,
Expand All @@ -266,7 +274,13 @@ func (n *NDCHistoryResenderImpl) getHistory(

ctx, cancel := rpc.NewContextFromParentWithTimeoutAndHeaders(ctx, resendContextTimeout)
defer cancel()
response, err := n.adminClient.GetWorkflowExecutionRawHistoryV2(ctx, &adminservice.GetWorkflowExecutionRawHistoryV2Request{

adminClient, err := n.clientBean.GetRemoteAdminClient(remoteClusterName)
if err != nil {
return nil, err
}

response, err := adminClient.GetWorkflowExecutionRawHistoryV2(ctx, &adminservice.GetWorkflowExecutionRawHistoryV2Request{
Namespace: namespace.String(),
Execution: &commonpb.WorkflowExecution{
WorkflowId: workflowID,
Expand Down
8 changes: 4 additions & 4 deletions common/xdc/nDCHistoryResender_mock.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

12 changes: 9 additions & 3 deletions common/xdc/nDCHistoryResender_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,8 +29,6 @@ import (
"testing"
"time"

"go.temporal.io/server/common/persistence/serialization"

"github.com/golang/mock/gomock"
"github.com/pborman/uuid"
"github.com/stretchr/testify/require"
Expand All @@ -45,10 +43,12 @@ import (
"go.temporal.io/server/api/historyservice/v1"
"go.temporal.io/server/api/historyservicemock/v1"
persistencespb "go.temporal.io/server/api/persistence/v1"
"go.temporal.io/server/client"
"go.temporal.io/server/common"
"go.temporal.io/server/common/cluster"
"go.temporal.io/server/common/log"
"go.temporal.io/server/common/namespace"
"go.temporal.io/server/common/persistence/serialization"
"go.temporal.io/server/common/primitives/timestamp"
serviceerrors "go.temporal.io/server/common/serviceerror"
)
Expand All @@ -61,6 +61,7 @@ type (
controller *gomock.Controller
mockClusterMetadata *cluster.MockMetadata
mockNamespaceCache *namespace.MockRegistry
mockClientBean *client.MockBean
mockAdminClient *adminservicemock.MockAdminServiceClient
mockHistoryClient *historyservicemock.MockHistoryServiceClient

Expand Down Expand Up @@ -91,10 +92,13 @@ func (s *nDCHistoryResenderSuite) SetupTest() {

s.controller = gomock.NewController(s.T())
s.mockClusterMetadata = cluster.NewMockMetadata(s.controller)
s.mockClientBean = client.NewMockBean(s.controller)
s.mockAdminClient = adminservicemock.NewMockAdminServiceClient(s.controller)
s.mockHistoryClient = historyservicemock.NewMockHistoryServiceClient(s.controller)
s.mockNamespaceCache = namespace.NewMockRegistry(s.controller)

s.mockClientBean.EXPECT().GetRemoteAdminClient(gomock.Any()).Return(s.mockAdminClient, nil).AnyTimes()

s.logger = log.NewTestLogger()
s.mockClusterMetadata.EXPECT().IsGlobalNamespaceEnabled().Return(true).AnyTimes()

Expand All @@ -118,7 +122,7 @@ func (s *nDCHistoryResenderSuite) SetupTest() {

s.rereplicator = NewNDCHistoryResender(
s.mockNamespaceCache,
s.mockAdminClient,
s.mockClientBean,
func(ctx context.Context, request *historyservice.ReplicateEventsV2Request) error {
_, err := s.mockHistoryClient.ReplicateEventsV2(ctx, request)
return err
Expand Down Expand Up @@ -219,6 +223,7 @@ func (s *nDCHistoryResenderSuite) TestSendSingleWorkflowHistory() {
}).Return(nil, nil).Times(2)

err := s.rereplicator.SendSingleWorkflowHistory(
cluster.TestCurrentClusterName,
s.namespaceID,
workflowID,
runID,
Expand Down Expand Up @@ -355,6 +360,7 @@ func (s *nDCHistoryResenderSuite) TestGetHistory() {

out, err := s.rereplicator.getHistory(
context.Background(),
cluster.TestCurrentClusterName,
s.namespaceID,
workflowID,
runID,
Expand Down
7 changes: 2 additions & 5 deletions service/frontend/adminHandler.go
Original file line number Diff line number Diff line change
Expand Up @@ -1419,13 +1419,9 @@ func (adh *AdminHandler) ResendReplicationTasks(
if request == nil {
return nil, adh.error(errRequestNotSet, scope)
}
remoteClient, err := adh.clientBean.GetRemoteAdminClient(request.GetRemoteCluster())
if err != nil {
return nil, err
}
resender := xdc.NewNDCHistoryResender(
adh.namespaceRegistry,
remoteClient,
adh.clientBean,
func(ctx context.Context, request *historyservice.ReplicateEventsV2Request) error {
_, err1 := adh.historyClient.ReplicateEventsV2(ctx, request)
return err1
Expand All @@ -1435,6 +1431,7 @@ func (adh *AdminHandler) ResendReplicationTasks(
adh.logger,
)
if err := resender.SendSingleWorkflowHistory(
request.GetRemoteCluster(),
namespace.ID(request.GetNamespaceId()),
request.GetWorkflowId(),
request.GetRunId(),
Expand Down
4 changes: 3 additions & 1 deletion service/history/historyEngine.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ import (
"go.temporal.io/server/api/matchingservice/v1"
replicationspb "go.temporal.io/server/api/replication/v1"
tokenspb "go.temporal.io/server/api/token/v1"
"go.temporal.io/server/client"
"go.temporal.io/server/common"
"go.temporal.io/server/common/clock"
"go.temporal.io/server/common/cluster"
Expand Down Expand Up @@ -119,6 +120,7 @@ type (
// NewEngineWithShardContext creates an instance of history engine
func NewEngineWithShardContext(
shard shard.Context,
clientBean client.Bean,
matchingClient matchingservice.MatchingServiceClient,
sdkClientFactory sdk.ClientFactory,
eventNotifier events.Notifier,
Expand Down Expand Up @@ -213,7 +215,7 @@ func NewEngineWithShardContext(
)

historyEngImpl.workflowTaskHandler = newWorkflowTaskHandlerCallback(historyEngImpl)
historyEngImpl.replicationDLQHandler = replication.NewLazyDLQHandler(shard, workflowDeleteManager, historyCache)
historyEngImpl.replicationDLQHandler = replication.NewLazyDLQHandler(shard, workflowDeleteManager, historyCache, clientBean)

return historyEngImpl
}
Expand Down
3 changes: 3 additions & 0 deletions service/history/historyEngineFactory.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ package history
import (
"go.uber.org/fx"

"go.temporal.io/server/client"
"go.temporal.io/server/common/persistence/serialization"
"go.temporal.io/server/common/resource"
"go.temporal.io/server/common/sdk"
Expand All @@ -42,6 +43,7 @@ type (
HistoryEngineFactoryParams struct {
fx.In

ClientBean client.Bean
MatchingClient resource.MatchingClient
SdkClientFactory sdk.ClientFactory
EventNotifier events.Notifier
Expand All @@ -63,6 +65,7 @@ func (f *historyEngineFactory) CreateEngine(
) shard.Engine {
return NewEngineWithShardContext(
context,
f.ClientBean,
f.MatchingClient,
f.SdkClientFactory,
f.EventNotifier,
Expand Down
9 changes: 7 additions & 2 deletions service/history/queueProcessorFactory.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import (
"go.uber.org/fx"

"go.temporal.io/server/api/historyservice/v1"
"go.temporal.io/server/client"
"go.temporal.io/server/common/cluster"
"go.temporal.io/server/common/metrics"
"go.temporal.io/server/common/namespace"
Expand Down Expand Up @@ -81,6 +82,7 @@ type (

SchedulerParams

ClientBean client.Bean
ArchivalClient archiver.Client
SdkClientFactory sdk.ClientFactory
MatchingClient resource.MatchingClient
Expand All @@ -92,6 +94,7 @@ type (

SchedulerParams

ClientBean client.Bean
ArchivalClient archiver.Client
MatchingClient resource.MatchingClient
}
Expand All @@ -108,6 +111,7 @@ type (
fx.In

Config *configs.Config
ClientBean client.Bean
ArchivalClient archiver.Client
EventSerializer serialization.Serializer
TaskFetcherFactory replication.TaskFetcherFactory
Expand Down Expand Up @@ -177,9 +181,9 @@ func (f *transferQueueProcessorFactory) CreateProcessor(
) queues.Processor {
return newTransferQueueProcessor(
shard,
engine,
workflowCache,
f.scheduler,
f.ClientBean,
f.ArchivalClient,
f.SdkClientFactory,
f.MatchingClient,
Expand Down Expand Up @@ -228,9 +232,9 @@ func (f *timerQueueProcessorFactory) CreateProcessor(
) queues.Processor {
return newTimerQueueProcessor(
shard,
engine,
workflowCache,
f.scheduler,
f.ClientBean,
f.ArchivalClient,
f.MatchingClient,
)
Expand Down Expand Up @@ -305,5 +309,6 @@ func (f *replicationQueueProcessorFactory) CreateProcessor(
shard,
f.TaskFetcherFactory,
workflowCache,
f.ClientBean,
)
}

0 comments on commit d36291f

Please sign in to comment.